mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-21 18:11:22 +00:00
Wire contexts to Apps controllers
This commit is contained in:
@@ -88,7 +88,7 @@ type ReplicaSetController struct {
|
||||
// It resumes normal action after observing the watch events for them.
|
||||
burstReplicas int
|
||||
// To allow injection of syncReplicaSet for testing.
|
||||
syncHandler func(rsKey string) error
|
||||
syncHandler func(ctx context.Context, rsKey string) error
|
||||
|
||||
// A TTLCache of pod creates/deletes each rc expects to see.
|
||||
expectations *controller.UIDTrackingControllerExpectations
|
||||
@@ -178,7 +178,7 @@ func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder)
|
||||
}
|
||||
|
||||
// Run begins watching and syncing.
|
||||
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
|
||||
func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer rsc.queue.ShutDown()
|
||||
|
||||
@@ -186,15 +186,15 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
|
||||
klog.Infof("Starting %v controller", controllerName)
|
||||
defer klog.Infof("Shutting down %v controller", controllerName)
|
||||
|
||||
if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
|
||||
if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(rsc.worker, time.Second, stopCh)
|
||||
go wait.UntilWithContext(ctx, rsc.worker, time.Second)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
// getReplicaSetsWithSameController returns a list of ReplicaSets with the same
|
||||
@@ -515,19 +515,19 @@ func (rsc *ReplicaSetController) deletePod(obj interface{}) {
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (rsc *ReplicaSetController) worker() {
|
||||
for rsc.processNextWorkItem() {
|
||||
func (rsc *ReplicaSetController) worker(ctx context.Context) {
|
||||
for rsc.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rsc *ReplicaSetController) processNextWorkItem() bool {
|
||||
func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool {
|
||||
key, quit := rsc.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer rsc.queue.Done(key)
|
||||
|
||||
err := rsc.syncHandler(key.(string))
|
||||
err := rsc.syncHandler(ctx, key.(string))
|
||||
if err == nil {
|
||||
rsc.queue.Forget(key)
|
||||
return true
|
||||
@@ -647,7 +647,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
|
||||
// syncReplicaSet will sync the ReplicaSet with the given key if it has had its expectations fulfilled,
|
||||
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
|
||||
// invoked concurrently with the same key.
|
||||
func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime))
|
||||
@@ -686,7 +686,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
|
||||
// NOTE: filteredPods are pointing to objects from cache - if you need to
|
||||
// modify them, you need to copy it first.
|
||||
filteredPods, err = rsc.claimPods(rs, selector, filteredPods)
|
||||
filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -714,11 +714,11 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
return manageReplicasErr
|
||||
}
|
||||
|
||||
func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
|
||||
func (rsc *ReplicaSetController) claimPods(ctx context.Context, rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
|
||||
// If any adoptions are attempted, we should first recheck for deletion with
|
||||
// an uncached quorum read sometime after listing Pods (see #42639).
|
||||
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
|
||||
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{})
|
||||
canAdoptFunc := controller.RecheckDeletionTimestamp(func(ctx context.Context) (metav1.Object, error) {
|
||||
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -728,7 +728,7 @@ func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.
|
||||
return fresh, nil
|
||||
})
|
||||
cm := controller.NewPodControllerRefManager(rsc.podControl, rs, selector, rsc.GroupVersionKind, canAdoptFunc)
|
||||
return cm.ClaimPods(filteredPods)
|
||||
return cm.ClaimPods(ctx, filteredPods)
|
||||
}
|
||||
|
||||
// slowStartBatch tries to call the provided function a total of 'count' times,
|
||||
|
@@ -185,12 +185,12 @@ func processSync(rsc *ReplicaSetController, key string) error {
|
||||
rsc.syncHandler = oldSyncHandler
|
||||
}()
|
||||
var syncErr error
|
||||
rsc.syncHandler = func(key string) error {
|
||||
syncErr = oldSyncHandler(key)
|
||||
rsc.syncHandler = func(ctx context.Context, key string) error {
|
||||
syncErr = oldSyncHandler(ctx, key)
|
||||
return syncErr
|
||||
}
|
||||
rsc.queue.Add(key)
|
||||
rsc.processNextWorkItem()
|
||||
rsc.processNextWorkItem(context.TODO())
|
||||
return syncErr
|
||||
}
|
||||
|
||||
@@ -224,7 +224,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
|
||||
newPodList(informers.Core().V1().Pods().Informer().GetIndexer(), 2, v1.PodRunning, labelMap, rsSpec, "pod")
|
||||
|
||||
manager.podControl = &fakePodControl
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -240,7 +240,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
manager.podControl = &fakePodControl
|
||||
|
||||
received := make(chan string)
|
||||
manager.syncHandler = func(key string) error {
|
||||
manager.syncHandler = func(ctx context.Context, key string) error {
|
||||
received <- key
|
||||
return nil
|
||||
}
|
||||
@@ -253,7 +253,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
|
||||
pods := newPodList(nil, 1, v1.PodRunning, labelMap, rsSpec, "pod")
|
||||
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
|
||||
|
||||
go manager.worker()
|
||||
go manager.worker(context.TODO())
|
||||
|
||||
expected := GetKey(rsSpec, t)
|
||||
select {
|
||||
@@ -282,7 +282,7 @@ func TestSyncReplicaSetCreateFailures(t *testing.T) {
|
||||
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)
|
||||
|
||||
manager.podControl = &fakePodControl
|
||||
manager.syncReplicaSet(GetKey(rs, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, fakePodControl.CreateLimit, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -324,7 +324,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
rsSpec.Status.Replicas = 1
|
||||
rsSpec.Status.ReadyReplicas = 1
|
||||
rsSpec.Status.AvailableReplicas = 1
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -335,7 +335,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
rsSpec.Status.ReadyReplicas = 0
|
||||
rsSpec.Status.AvailableReplicas = 0
|
||||
fakePodControl.Clear()
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -356,7 +356,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
fakePodControl.Clear()
|
||||
fakePodControl.Err = fmt.Errorf("fake Error")
|
||||
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -365,7 +365,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
// This replica should not need a Lowering of expectations, since the previous create failed
|
||||
fakePodControl.Clear()
|
||||
fakePodControl.Err = nil
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -600,7 +600,7 @@ func TestWatchControllers(t *testing.T) {
|
||||
// The update sent through the fakeWatcher should make its way into the workqueue,
|
||||
// and eventually into the syncHandler. The handler validates the received controller
|
||||
// and closes the received channel to indicate that the test can finish.
|
||||
manager.syncHandler = func(key string) error {
|
||||
manager.syncHandler = func(ctx context.Context, key string) error {
|
||||
obj, exists, err := informers.Apps().V1().ReplicaSets().Informer().GetIndexer().GetByKey(key)
|
||||
if !exists || err != nil {
|
||||
t.Errorf("Expected to find replica set under key %v", key)
|
||||
@@ -614,7 +614,7 @@ func TestWatchControllers(t *testing.T) {
|
||||
}
|
||||
// Start only the ReplicaSet watcher and the workqueue, send a watch event,
|
||||
// and make sure it hits the sync method.
|
||||
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
|
||||
go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond)
|
||||
|
||||
testRSSpec.Name = "foo"
|
||||
fakeWatch.Add(&testRSSpec)
|
||||
@@ -645,7 +645,7 @@ func TestWatchPods(t *testing.T) {
|
||||
received := make(chan string)
|
||||
// The pod update sent through the fakeWatcher should figure out the managing ReplicaSet and
|
||||
// send it into the syncHandler.
|
||||
manager.syncHandler = func(key string) error {
|
||||
manager.syncHandler = func(ctx context.Context, key string) error {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
t.Errorf("Error splitting key: %v", err)
|
||||
@@ -664,7 +664,7 @@ func TestWatchPods(t *testing.T) {
|
||||
// Start only the pod watcher and the workqueue, send a watch event,
|
||||
// and make sure it hits the sync method for the right ReplicaSet.
|
||||
go informers.Core().V1().Pods().Informer().Run(stopCh)
|
||||
go manager.Run(1, stopCh)
|
||||
go manager.Run(context.TODO(), 1)
|
||||
|
||||
pods := newPodList(nil, 1, v1.PodRunning, labelMap, testRSSpec, "pod")
|
||||
testPod := pods.Items[0]
|
||||
@@ -685,7 +685,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
|
||||
received := make(chan string)
|
||||
|
||||
manager.syncHandler = func(key string) error {
|
||||
manager.syncHandler = func(ctx context.Context, key string) error {
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
t.Errorf("Error splitting key: %v", err)
|
||||
@@ -698,7 +698,7 @@ func TestUpdatePods(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
go wait.Until(manager.worker, 10*time.Millisecond, stopCh)
|
||||
go wait.UntilWithContext(context.TODO(), manager.worker, 10*time.Millisecond)
|
||||
|
||||
// Put 2 ReplicaSets and one pod into the informers
|
||||
labelMap1 := map[string]string{"foo": "bar"}
|
||||
@@ -829,7 +829,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
|
||||
// Enqueue once. Then process it. Disable rate-limiting for this.
|
||||
manager.queue = workqueue.NewRateLimitingQueue(workqueue.NewMaxOfRateLimiter())
|
||||
manager.enqueueRS(rs)
|
||||
manager.processNextWorkItem()
|
||||
manager.processNextWorkItem(context.TODO())
|
||||
// It should have been requeued.
|
||||
if got, want := manager.queue.Len(), 1; got != want {
|
||||
t.Errorf("queue.Len() = %v, want %v", got, want)
|
||||
@@ -909,7 +909,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rsSpec)
|
||||
|
||||
for i := 0; i < numReplicas; i += burstReplicas {
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
|
||||
// The store accrues active pods. It's also used by the ReplicaSet to determine how many
|
||||
// replicas to create.
|
||||
@@ -988,7 +988,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
|
||||
|
||||
// Check that the ReplicaSet didn't take any action for all the above pods
|
||||
fakePodControl.Clear()
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -1075,7 +1075,7 @@ func TestRSSyncExpectations(t *testing.T) {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(&postExpectationsPod)
|
||||
},
|
||||
})
|
||||
manager.syncReplicaSet(GetKey(rsSpec, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rsSpec, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -1095,7 +1095,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
manager.podControl = &fakePodControl
|
||||
|
||||
// This should set expectations for the ReplicaSet
|
||||
manager.syncReplicaSet(GetKey(rs, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
err := validateSyncReplicaSet(&fakePodControl, 1, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -1116,7 +1116,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
}
|
||||
informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Delete(rs)
|
||||
manager.deleteRS(rs)
|
||||
manager.syncReplicaSet(GetKey(rs, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
|
||||
_, exists, err = manager.expectations.GetExpectations(rsKey)
|
||||
if err != nil {
|
||||
@@ -1129,7 +1129,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
|
||||
// This should have no effect, since we've deleted the ReplicaSet.
|
||||
podExp.Add(-1, 0)
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Replace(make([]interface{}, 0), "0")
|
||||
manager.syncReplicaSet(GetKey(rs, t))
|
||||
manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
err = validateSyncReplicaSet(&fakePodControl, 0, 0, 0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@@ -1171,7 +1171,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
t.Fatalf("initial RS didn't result in new item in the queue: %v", err)
|
||||
}
|
||||
|
||||
ok := manager.processNextWorkItem()
|
||||
ok := manager.processNextWorkItem(context.TODO())
|
||||
if !ok {
|
||||
t.Fatal("queue is shutting down")
|
||||
}
|
||||
@@ -1257,7 +1257,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
|
||||
t.Fatalf("Re-creating RS didn't result in new item in the queue: %v", err)
|
||||
}
|
||||
|
||||
ok = manager.processNextWorkItem()
|
||||
ok = manager.processNextWorkItem(context.TODO())
|
||||
if !ok {
|
||||
t.Fatal("Queue is shutting down!")
|
||||
}
|
||||
@@ -1457,7 +1457,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
|
||||
pod := newPod("pod", rs, v1.PodRunning, nil, true)
|
||||
pod.OwnerReferences = []metav1.OwnerReference{otherControllerReference}
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
|
||||
err := manager.syncReplicaSet(GetKey(rs, t))
|
||||
err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -1514,7 +1514,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||
|
||||
// no patch, no create
|
||||
err := manager.syncReplicaSet(GetKey(rs, t))
|
||||
err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -1543,7 +1543,7 @@ func TestDoNotAdoptOrCreateIfBeingDeletedRace(t *testing.T) {
|
||||
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||
|
||||
// sync should abort.
|
||||
err := manager.syncReplicaSet(GetKey(rs, t))
|
||||
err := manager.syncReplicaSet(context.TODO(), GetKey(rs, t))
|
||||
if err == nil {
|
||||
t.Error("syncReplicaSet() err = nil, expected non-nil")
|
||||
}
|
||||
|
Reference in New Issue
Block a user