diff --git a/pkg/controller/devicetainteviction/OWNERS b/pkg/controller/devicetainteviction/OWNERS new file mode 100644 index 00000000000..19c02ee0b10 --- /dev/null +++ b/pkg/controller/devicetainteviction/OWNERS @@ -0,0 +1,8 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: + - sig-scheduling-maintainers +reviewers: + - sig-scheduling +labels: + - sig/scheduling diff --git a/pkg/controller/devicetainteviction/doc.go b/pkg/controller/devicetainteviction/doc.go new file mode 100644 index 00000000000..aac6640aa28 --- /dev/null +++ b/pkg/controller/devicetainteviction/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package tainteviction contains the logic implementing taint-based eviction +// for Pods running on Nodes with NoExecute taints. +package tainteviction diff --git a/pkg/controller/devicetainteviction/metrics/metrics.go b/pkg/controller/devicetainteviction/metrics/metrics.go new file mode 100644 index 00000000000..600c22c81e7 --- /dev/null +++ b/pkg/controller/devicetainteviction/metrics/metrics.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const taintEvictionControllerSubsystem = "taint_eviction_controller" + +var ( + // PodDeletionsTotal counts the number of Pods deleted by TaintEvictionController since its start. + PodDeletionsTotal = metrics.NewCounter( + &metrics.CounterOpts{ + Subsystem: taintEvictionControllerSubsystem, + Name: "pod_deletions_total", + Help: "Total number of Pods deleted by TaintEvictionController since its start.", + StabilityLevel: metrics.ALPHA, + }, + ) + + // PodDeletionsLatency tracks the latency, in seconds, between the time when a taint effect has been activated + // for the Pod and its deletion. + PodDeletionsLatency = metrics.NewHistogram( + &metrics.HistogramOpts{ + Subsystem: taintEvictionControllerSubsystem, + Name: "pod_deletion_duration_seconds", + Help: "Latency, in seconds, between the time when a taint effect has been activated for the Pod and its deletion via TaintEvictionController.", + Buckets: []float64{0.005, 0.025, 0.1, 0.5, 1, 2.5, 10, 30, 60, 120, 180, 240}, // 5ms to 4m + StabilityLevel: metrics.ALPHA, + }, + ) +) + +var registerMetrics sync.Once + +// Register registers TaintEvictionController metrics. +func Register() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(PodDeletionsTotal) + legacyregistry.MustRegister(PodDeletionsLatency) + }) +} diff --git a/pkg/controller/devicetainteviction/taint_eviction.go b/pkg/controller/devicetainteviction/taint_eviction.go new file mode 100644 index 00000000000..48ab6f0ec51 --- /dev/null +++ b/pkg/controller/devicetainteviction/taint_eviction.go @@ -0,0 +1,614 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tainteviction + +import ( + "context" + "fmt" + "hash/fnv" + "io" + "math" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + corev1informers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/apis/core/helper" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/controller/tainteviction/metrics" + controllerutil "k8s.io/kubernetes/pkg/controller/util/node" + utilpod "k8s.io/kubernetes/pkg/util/pod" +) + +const ( + // TODO (k82cn): Figure out a reasonable number of workers/channels and propagate + // the number of workers up making it a parameter of Run() function. + + // NodeUpdateChannelSize defines the size of channel for node update events. + NodeUpdateChannelSize = 10 + // UpdateWorkerSize defines the size of workers for node update or/and pod update. + UpdateWorkerSize = 8 + podUpdateChannelSize = 1 + retries = 5 +) + +type nodeUpdateItem struct { + nodeName string +} + +type podUpdateItem struct { + podName string + podNamespace string + nodeName string +} + +func hash(val string, max int) int { + hasher := fnv.New32a() + io.WriteString(hasher, val) + return int(hasher.Sum32() % uint32(max)) +} + +// GetPodsByNodeNameFunc returns the list of pods assigned to the specified node. +type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error) + +// Controller listens to Taint/Toleration changes and is responsible for removing Pods +// from Nodes tainted with NoExecute Taints. +type Controller struct { + name string + + client clientset.Interface + broadcaster record.EventBroadcaster + recorder record.EventRecorder + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + getPodsAssignedToNode GetPodsByNodeNameFunc + + taintEvictionQueue *TimedWorkerQueue + // keeps a map from nodeName to all noExecute taints on that Node + taintedNodesLock sync.Mutex + taintedNodes map[string][]v1.Taint + + nodeUpdateChannels []chan nodeUpdateItem + podUpdateChannels []chan podUpdateItem + + nodeUpdateQueue workqueue.TypedInterface[nodeUpdateItem] + podUpdateQueue workqueue.TypedInterface[podUpdateItem] +} + +func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName), controllerName string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error { + return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error { + ns := args.NamespacedName.Namespace + name := args.NamespacedName.Name + klog.FromContext(ctx).Info("Deleting pod", "controller", controllerName, "pod", args.NamespacedName) + if emitEventFunc != nil { + emitEventFunc(args.NamespacedName) + } + var err error + for i := 0; i < retries; i++ { + err = addConditionAndDeletePod(ctx, c, name, ns) + if err == nil { + metrics.PodDeletionsTotal.Inc() + metrics.PodDeletionsLatency.Observe(float64(time.Since(fireAt) * time.Second)) + break + } + time.Sleep(10 * time.Millisecond) + } + return err + } +} + +func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) { + pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + newStatus := pod.Status.DeepCopy() + updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ + Type: v1.DisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByTaintManager", + Message: "Taint manager: deleting due to NoExecute taint", + }) + if updated { + if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { + return err + } + } + return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{}) +} + +func getNoExecuteTaints(taints []v1.Taint) []v1.Taint { + result := []v1.Taint{} + for i := range taints { + if taints[i].Effect == v1.TaintEffectNoExecute { + result = append(result, taints[i]) + } + } + return result +} + +// getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite. +func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { + minTolerationTime := int64(math.MaxInt64) + if len(tolerations) == 0 { + return 0 + } + + for i := range tolerations { + if tolerations[i].TolerationSeconds != nil { + tolerationSeconds := *(tolerations[i].TolerationSeconds) + if tolerationSeconds <= 0 { + return 0 + } else if tolerationSeconds < minTolerationTime { + minTolerationTime = tolerationSeconds + } + } + } + + if minTolerationTime == int64(math.MaxInt64) { + return -1 + } + return time.Duration(minTolerationTime) * time.Second +} + +// New creates a new Controller that will use passed clientset to communicate with the API server. +func New(ctx context.Context, c clientset.Interface, podInformer corev1informers.PodInformer, nodeInformer corev1informers.NodeInformer, controllerName string) (*Controller, error) { + logger := klog.FromContext(ctx) + metrics.Register() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) + + podIndexer := podInformer.Informer().GetIndexer() + + tm := &Controller{ + name: controllerName, + + client: c, + broadcaster: eventBroadcaster, + recorder: recorder, + podLister: podInformer.Lister(), + podListerSynced: podInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + getPodsAssignedToNode: func(nodeName string) ([]*v1.Pod, error) { + objs, err := podIndexer.ByIndex("spec.nodeName", nodeName) + if err != nil { + return nil, err + } + pods := make([]*v1.Pod, 0, len(objs)) + for _, obj := range objs { + pod, ok := obj.(*v1.Pod) + if !ok { + continue + } + pods = append(pods, pod) + } + return pods, nil + }, + taintedNodes: make(map[string][]v1.Taint), + + nodeUpdateQueue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[nodeUpdateItem]{Name: "noexec_taint_node"}), + podUpdateQueue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[podUpdateItem]{Name: "noexec_taint_pod"}), + } + tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent, tm.name)) + + _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pod := obj.(*v1.Pod) + tm.PodUpdated(nil, pod) + }, + UpdateFunc: func(prev, obj interface{}) { + prevPod := prev.(*v1.Pod) + newPod := obj.(*v1.Pod) + tm.PodUpdated(prevPod, newPod) + }, + DeleteFunc: func(obj interface{}) { + pod, isPod := obj.(*v1.Pod) + // We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly. + if !isPod { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + logger.Error(nil, "Received unexpected object", "object", obj) + return + } + pod, ok = deletedState.Obj.(*v1.Pod) + if !ok { + logger.Error(nil, "DeletedFinalStateUnknown contained non-Pod object", "object", deletedState.Obj) + return + } + } + tm.PodUpdated(pod, nil) + }, + }) + if err != nil { + return nil, fmt.Errorf("unable to add pod event handler: %w", err) + } + + _, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error { + tm.NodeUpdated(nil, node) + return nil + }), + UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error { + tm.NodeUpdated(oldNode, newNode) + return nil + }), + DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error { + tm.NodeUpdated(node, nil) + return nil + }), + }) + if err != nil { + return nil, fmt.Errorf("unable to add node event handler: %w", err) + } + + return tm, nil +} + +// Run starts the controller which will run in loop until `stopCh` is closed. +func (tc *Controller) Run(ctx context.Context) { + defer utilruntime.HandleCrash() + logger := klog.FromContext(ctx) + logger.Info("Starting", "controller", tc.name) + defer logger.Info("Shutting down controller", "controller", tc.name) + + // Start events processing pipeline. + tc.broadcaster.StartStructuredLogging(3) + if tc.client != nil { + logger.Info("Sending events to api server") + tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")}) + } else { + logger.Error(nil, "kubeClient is nil", "controller", tc.name) + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + defer tc.broadcaster.Shutdown() + defer tc.nodeUpdateQueue.ShutDown() + defer tc.podUpdateQueue.ShutDown() + + // wait for the cache to be synced + if !cache.WaitForNamedCacheSync(tc.name, ctx.Done(), tc.podListerSynced, tc.nodeListerSynced) { + return + } + + for i := 0; i < UpdateWorkerSize; i++ { + tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize)) + tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize)) + } + + // Functions that are responsible for taking work items out of the workqueues and putting them + // into channels. + go func(stopCh <-chan struct{}) { + for { + nodeUpdate, shutdown := tc.nodeUpdateQueue.Get() + if shutdown { + break + } + hash := hash(nodeUpdate.nodeName, UpdateWorkerSize) + select { + case <-stopCh: + tc.nodeUpdateQueue.Done(nodeUpdate) + return + case tc.nodeUpdateChannels[hash] <- nodeUpdate: + // tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker + } + } + }(ctx.Done()) + + go func(stopCh <-chan struct{}) { + for { + podUpdate, shutdown := tc.podUpdateQueue.Get() + if shutdown { + break + } + // The fact that pods are processed by the same worker as nodes is used to avoid races + // between node worker setting tc.taintedNodes and pod worker reading this to decide + // whether to delete pod. + // It's possible that even without this assumption this code is still correct. + hash := hash(podUpdate.nodeName, UpdateWorkerSize) + select { + case <-stopCh: + tc.podUpdateQueue.Done(podUpdate) + return + case tc.podUpdateChannels[hash] <- podUpdate: + // tc.podUpdateQueue.Done is called by the podUpdateChannels worker + } + } + }(ctx.Done()) + + wg := sync.WaitGroup{} + wg.Add(UpdateWorkerSize) + for i := 0; i < UpdateWorkerSize; i++ { + go tc.worker(ctx, i, wg.Done, ctx.Done()) + } + wg.Wait() +} + +func (tc *Controller) worker(ctx context.Context, worker int, done func(), stopCh <-chan struct{}) { + defer done() + + // When processing events we want to prioritize Node updates over Pod updates, + // as NodeUpdates that interest the controller should be handled as soon as possible - + // we don't want user (or system) to wait until PodUpdate queue is drained before it can + // start evicting Pods from tainted Nodes. + for { + select { + case <-stopCh: + return + case nodeUpdate := <-tc.nodeUpdateChannels[worker]: + tc.handleNodeUpdate(ctx, nodeUpdate) + tc.nodeUpdateQueue.Done(nodeUpdate) + case podUpdate := <-tc.podUpdateChannels[worker]: + // If we found a Pod update we need to empty Node queue first. + priority: + for { + select { + case nodeUpdate := <-tc.nodeUpdateChannels[worker]: + tc.handleNodeUpdate(ctx, nodeUpdate) + tc.nodeUpdateQueue.Done(nodeUpdate) + default: + break priority + } + } + // After Node queue is emptied we process podUpdate. + tc.handlePodUpdate(ctx, podUpdate) + tc.podUpdateQueue.Done(podUpdate) + } + } +} + +// PodUpdated is used to notify NoExecuteTaintManager about Pod changes. +func (tc *Controller) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) { + podName := "" + podNamespace := "" + nodeName := "" + oldTolerations := []v1.Toleration{} + if oldPod != nil { + podName = oldPod.Name + podNamespace = oldPod.Namespace + nodeName = oldPod.Spec.NodeName + oldTolerations = oldPod.Spec.Tolerations + } + newTolerations := []v1.Toleration{} + if newPod != nil { + podName = newPod.Name + podNamespace = newPod.Namespace + nodeName = newPod.Spec.NodeName + newTolerations = newPod.Spec.Tolerations + } + + if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName { + return + } + updateItem := podUpdateItem{ + podName: podName, + podNamespace: podNamespace, + nodeName: nodeName, + } + + tc.podUpdateQueue.Add(updateItem) +} + +// NodeUpdated is used to notify NoExecuteTaintManager about Node changes. +func (tc *Controller) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) { + nodeName := "" + oldTaints := []v1.Taint{} + if oldNode != nil { + nodeName = oldNode.Name + oldTaints = getNoExecuteTaints(oldNode.Spec.Taints) + } + + newTaints := []v1.Taint{} + if newNode != nil { + nodeName = newNode.Name + newTaints = getNoExecuteTaints(newNode.Spec.Taints) + } + + if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) { + return + } + updateItem := nodeUpdateItem{ + nodeName: nodeName, + } + + tc.nodeUpdateQueue.Add(updateItem) +} + +func (tc *Controller) cancelWorkWithEvent(logger klog.Logger, nsName types.NamespacedName) { + if tc.taintEvictionQueue.CancelWork(logger, nsName.String()) { + tc.emitCancelPodDeletionEvent(nsName) + } +} + +func (tc *Controller) processPodOnNode( + ctx context.Context, + podNamespacedName types.NamespacedName, + nodeName string, + tolerations []v1.Toleration, + taints []v1.Taint, + now time.Time, +) { + logger := klog.FromContext(ctx) + if len(taints) == 0 { + tc.cancelWorkWithEvent(logger, podNamespacedName) + } + allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations) + if !allTolerated { + logger.V(2).Info("Not all taints are tolerated after update for pod on node", "pod", podNamespacedName.String(), "node", klog.KRef("", nodeName)) + // We're canceling scheduled work (if any), as we're going to delete the Pod right away. + tc.cancelWorkWithEvent(logger, podNamespacedName) + tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), now, now) + return + } + minTolerationTime := getMinTolerationTime(usedTolerations) + // getMinTolerationTime returns negative value to denote infinite toleration. + if minTolerationTime < 0 { + logger.V(4).Info("Current tolerations for pod tolerate forever, cancelling any scheduled deletion", "pod", podNamespacedName.String()) + tc.cancelWorkWithEvent(logger, podNamespacedName) + return + } + + startTime := now + triggerTime := startTime.Add(minTolerationTime) + scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) + if scheduledEviction != nil { + startTime = scheduledEviction.CreatedAt + if startTime.Add(minTolerationTime).Before(triggerTime) { + return + } + tc.cancelWorkWithEvent(logger, podNamespacedName) + } + tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) +} + +func (tc *Controller) handlePodUpdate(ctx context.Context, podUpdate podUpdateItem) { + pod, err := tc.podLister.Pods(podUpdate.podNamespace).Get(podUpdate.podName) + logger := klog.FromContext(ctx) + if err != nil { + if apierrors.IsNotFound(err) { + // Delete + podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName} + logger.V(4).Info("Noticed pod deletion", "pod", podNamespacedName) + tc.cancelWorkWithEvent(logger, podNamespacedName) + return + } + utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err)) + return + } + + // We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object. + if pod.Spec.NodeName != podUpdate.nodeName { + return + } + + // Create or Update + podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + logger.V(4).Info("Noticed pod update", "pod", podNamespacedName) + nodeName := pod.Spec.NodeName + if nodeName == "" { + return + } + taints, ok := func() ([]v1.Taint, bool) { + tc.taintedNodesLock.Lock() + defer tc.taintedNodesLock.Unlock() + taints, ok := tc.taintedNodes[nodeName] + return taints, ok + }() + // It's possible that Node was deleted, or Taints were removed before, which triggered + // eviction cancelling if it was needed. + if !ok { + return + } + tc.processPodOnNode(ctx, podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now()) +} + +func (tc *Controller) handleNodeUpdate(ctx context.Context, nodeUpdate nodeUpdateItem) { + node, err := tc.nodeLister.Get(nodeUpdate.nodeName) + logger := klog.FromContext(ctx) + if err != nil { + if apierrors.IsNotFound(err) { + // Delete + logger.V(4).Info("Noticed node deletion", "node", klog.KRef("", nodeUpdate.nodeName)) + tc.taintedNodesLock.Lock() + defer tc.taintedNodesLock.Unlock() + delete(tc.taintedNodes, nodeUpdate.nodeName) + return + } + utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err)) + return + } + + // Create or Update + logger.V(4).Info("Noticed node update", "node", klog.KObj(node)) + taints := getNoExecuteTaints(node.Spec.Taints) + func() { + tc.taintedNodesLock.Lock() + defer tc.taintedNodesLock.Unlock() + logger.V(4).Info("Updating known taints on node", "node", klog.KObj(node), "taints", taints) + if len(taints) == 0 { + delete(tc.taintedNodes, node.Name) + } else { + tc.taintedNodes[node.Name] = taints + } + }() + + // This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode: + // getPodsAssignedToNode can be delayed as long as all future updates to pods will call + // tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods. + pods, err := tc.getPodsAssignedToNode(node.Name) + if err != nil { + logger.Error(err, "Failed to get pods assigned to node", "node", klog.KObj(node)) + return + } + if len(pods) == 0 { + return + } + // Short circuit, to make this controller a bit faster. + if len(taints) == 0 { + logger.V(4).Info("All taints were removed from the node. Cancelling all evictions...", "node", klog.KObj(node)) + for i := range pods { + tc.cancelWorkWithEvent(logger, types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}) + } + return + } + + now := time.Now() + for _, pod := range pods { + podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + tc.processPodOnNode(ctx, podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now) + } +} + +func (tc *Controller) emitPodDeletionEvent(nsName types.NamespacedName) { + if tc.recorder == nil { + return + } + ref := &v1.ObjectReference{ + APIVersion: "v1", + Kind: "Pod", + Name: nsName.Name, + Namespace: nsName.Namespace, + } + tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String()) +} + +func (tc *Controller) emitCancelPodDeletionEvent(nsName types.NamespacedName) { + if tc.recorder == nil { + return + } + ref := &v1.ObjectReference{ + APIVersion: "v1", + Kind: "Pod", + Name: nsName.Name, + Namespace: nsName.Namespace, + } + tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String()) +} diff --git a/pkg/controller/devicetainteviction/taint_eviction_test.go b/pkg/controller/devicetainteviction/taint_eviction_test.go new file mode 100644 index 00000000000..6c933d5f23d --- /dev/null +++ b/pkg/controller/devicetainteviction/taint_eviction_test.go @@ -0,0 +1,941 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tainteviction + +import ( + "context" + "fmt" + goruntime "runtime" + "sort" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/controller/testutil" +) + +var timeForControllerToProgressForSanityCheck = 20 * time.Millisecond + +func getPodsAssignedToNode(ctx context.Context, c *fake.Clientset) GetPodsByNodeNameFunc { + return func(nodeName string) ([]*corev1.Pod, error) { + selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}) + pods, err := c.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{ + FieldSelector: selector.String(), + LabelSelector: labels.Everything().String(), + }) + if err != nil { + return []*corev1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName) + } + rPods := make([]*corev1.Pod, len(pods.Items)) + for i := range pods.Items { + rPods[i] = &pods.Items[i] + } + return rPods, nil + } +} + +func createNoExecuteTaint(index int) corev1.Taint { + now := metav1.Now() + return corev1.Taint{ + Key: "testTaint" + fmt.Sprintf("%v", index), + Value: "test" + fmt.Sprintf("%v", index), + Effect: corev1.TaintEffectNoExecute, + TimeAdded: &now, + } +} + +func addToleration(pod *corev1.Pod, index int, duration int64) *corev1.Pod { + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + if duration < 0 { + pod.Spec.Tolerations = []corev1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: corev1.TaintEffectNoExecute}} + + } else { + pod.Spec.Tolerations = []corev1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &duration}} + } + return pod +} + +func addTaintsToNode(node *corev1.Node, key, value string, indices []int) *corev1.Node { + taints := []corev1.Taint{} + for _, index := range indices { + taints = append(taints, createNoExecuteTaint(index)) + } + node.Spec.Taints = taints + return node +} + +var alwaysReady = func() bool { return true } + +func setupNewController(ctx context.Context, fakeClientSet *fake.Clientset) (*Controller, cache.Indexer, cache.Indexer) { + informerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0) + podIndexer := informerFactory.Core().V1().Pods().Informer().GetIndexer() + nodeIndexer := informerFactory.Core().V1().Nodes().Informer().GetIndexer() + mgr, _ := New(ctx, fakeClientSet, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), "taint-eviction-controller") + mgr.podListerSynced = alwaysReady + mgr.nodeListerSynced = alwaysReady + mgr.getPodsAssignedToNode = getPodsAssignedToNode(ctx, fakeClientSet) + return mgr, podIndexer, nodeIndexer +} + +type timestampedPod struct { + names []string + timestamp time.Duration +} + +type durationSlice []timestampedPod + +func (a durationSlice) Len() int { return len(a) } +func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a durationSlice) Less(i, j int) bool { return a[i].timestamp < a[j].timestamp } + +func TestFilterNoExecuteTaints(t *testing.T) { + taints := []corev1.Taint{ + { + Key: "one", + Value: "one", + Effect: corev1.TaintEffectNoExecute, + }, + { + Key: "two", + Value: "two", + Effect: corev1.TaintEffectNoSchedule, + }, + } + taints = getNoExecuteTaints(taints) + if len(taints) != 1 || taints[0].Key != "one" { + t.Errorf("Filtering doesn't work. Got %v", taints) + } +} + +func TestCreatePod(t *testing.T) { + testCases := []struct { + description string + pod *corev1.Pod + taintedNodes map[string][]corev1.Taint + expectPatch bool + expectDelete bool + }{ + { + description: "not scheduled - ignore", + pod: testutil.NewPod("pod1", ""), + taintedNodes: map[string][]corev1.Taint{}, + expectDelete: false, + }, + { + description: "scheduled on untainted Node", + pod: testutil.NewPod("pod1", "node1"), + taintedNodes: map[string][]corev1.Taint{}, + expectDelete: false, + }, + { + description: "schedule on tainted Node", + pod: testutil.NewPod("pod1", "node1"), + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectPatch: true, + expectDelete: true, + }, + { + description: "schedule on tainted Node with finite toleration", + pod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100), + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectDelete: false, + }, + { + description: "schedule on tainted Node with infinite toleration", + pod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1), + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectDelete: false, + }, + { + description: "schedule on tainted Node with infinite invalid toleration", + pod: addToleration(testutil.NewPod("pod1", "node1"), 2, -1), + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectPatch: true, + expectDelete: true, + }, + } + + for _, item := range testCases { + t.Run(item.description, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: []corev1.Pod{*item.pod}}) + controller, podIndexer, _ := setupNewController(ctx, fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + controller.taintedNodes = item.taintedNodes + + podIndexer.Add(item.pod) + controller.PodUpdated(nil, item.pod) + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + + cancel() + }) + } +} + +func TestDeletePod(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fakeClientset := fake.NewSimpleClientset() + controller, _, _ := setupNewController(ctx, fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + controller.taintedNodes = map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + } + controller.PodUpdated(testutil.NewPod("pod1", "node1"), nil) + // wait a bit to see if nothing will panic + time.Sleep(timeForControllerToProgressForSanityCheck) +} + +func TestUpdatePod(t *testing.T) { + testCases := []struct { + description string + prevPod *corev1.Pod + awaitForScheduledEviction bool + newPod *corev1.Pod + taintedNodes map[string][]corev1.Taint + expectPatch bool + expectDelete bool + skipOnWindows bool + }{ + { + description: "scheduling onto tainted Node", + prevPod: testutil.NewPod("pod1", ""), + newPod: testutil.NewPod("pod1", "node1"), + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectPatch: true, + expectDelete: true, + }, + { + description: "scheduling onto tainted Node with toleration", + prevPod: addToleration(testutil.NewPod("pod1", ""), 1, -1), + newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1), + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectDelete: false, + }, + { + description: "removing toleration", + prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100), + newPod: testutil.NewPod("pod1", "node1"), + awaitForScheduledEviction: true, + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectPatch: true, + expectDelete: true, + }, + { + description: "lengthening toleration shouldn't work", + prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 1), + newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100), + awaitForScheduledEviction: true, + taintedNodes: map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectPatch: true, + expectDelete: true, + skipOnWindows: true, + }, + } + + for _, item := range testCases { + t.Run(item.description, func(t *testing.T) { + if item.skipOnWindows && goruntime.GOOS == "windows" { + // TODO: remove skip once the flaking test has been fixed. + t.Skip("Skip flaking test on Windows.") + } + ctx, cancel := context.WithCancel(context.Background()) + fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: []corev1.Pod{*item.prevPod}}) + controller, podIndexer, _ := setupNewController(context.TODO(), fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + controller.taintedNodes = item.taintedNodes + go controller.Run(ctx) + + podIndexer.Add(item.prevPod) + controller.PodUpdated(nil, item.prevPod) + + if item.awaitForScheduledEviction { + nsName := types.NamespacedName{Namespace: item.prevPod.Namespace, Name: item.prevPod.Name} + err := wait.PollImmediate(time.Millisecond*10, time.Second, func() (bool, error) { + scheduledEviction := controller.taintEvictionQueue.GetWorkerUnsafe(nsName.String()) + return scheduledEviction != nil, nil + }) + if err != nil { + t.Fatalf("Failed to await for scheduled eviction: %q", err) + } + } + + podIndexer.Update(item.newPod) + controller.PodUpdated(item.prevPod, item.newPod) + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + cancel() + }) + } +} + +func TestCreateNode(t *testing.T) { + testCases := []struct { + description string + pods []corev1.Pod + node *corev1.Node + expectPatch bool + expectDelete bool + }{ + { + description: "Creating Node matching already assigned Pod", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + node: testutil.NewNode("node1"), + expectPatch: false, + expectDelete: false, + }, + { + description: "Creating tainted Node matching already assigned Pod", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: true, + expectDelete: true, + }, + { + description: "Creating tainted Node matching already assigned tolerating Pod", + pods: []corev1.Pod{ + *addToleration(testutil.NewPod("pod1", "node1"), 1, -1), + }, + node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: false, + expectDelete: false, + }, + } + + for _, item := range testCases { + ctx, cancel := context.WithCancel(context.Background()) + fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods}) + controller, _, nodeIndexer := setupNewController(ctx, fakeClientset) + nodeIndexer.Add(item.node) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + controller.NodeUpdated(nil, item.node) + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + + cancel() + } +} + +func TestDeleteNode(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + fakeClientset := fake.NewSimpleClientset() + controller, _, _ := setupNewController(ctx, fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + controller.taintedNodes = map[string][]corev1.Taint{ + "node1": {createNoExecuteTaint(1)}, + } + go controller.Run(ctx) + controller.NodeUpdated(testutil.NewNode("node1"), nil) + + // await until controller.taintedNodes is empty + err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) { + controller.taintedNodesLock.Lock() + defer controller.taintedNodesLock.Unlock() + _, ok := controller.taintedNodes["node1"] + return !ok, nil + }) + if err != nil { + t.Errorf("Failed to await for processing node deleted: %q", err) + } + cancel() +} + +func TestUpdateNode(t *testing.T) { + testCases := []struct { + description string + pods []corev1.Pod + oldNode *corev1.Node + newNode *corev1.Node + expectPatch bool + expectDelete bool + additionalSleep time.Duration + }{ + { + description: "Added taint, expect node patched and deleted", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: true, + expectDelete: true, + }, + { + description: "Added tolerated taint", + pods: []corev1.Pod{ + *addToleration(testutil.NewPod("pod1", "node1"), 1, 100), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectDelete: false, + }, + { + description: "Only one added taint tolerated", + pods: []corev1.Pod{ + *addToleration(testutil.NewPod("pod1", "node1"), 1, 100), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}), + expectPatch: true, + expectDelete: true, + }, + { + description: "Taint removed", + pods: []corev1.Pod{ + *addToleration(testutil.NewPod("pod1", "node1"), 1, 1), + }, + oldNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + newNode: testutil.NewNode("node1"), + expectDelete: false, + additionalSleep: 1500 * time.Millisecond, + }, + { + description: "Pod with multiple tolerations are evicted when first one runs out", + pods: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + }, + Spec: corev1.PodSpec{ + NodeName: "node1", + Tolerations: []corev1.Toleration{ + {Key: "testTaint1", Value: "test1", Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &[]int64{1}[0]}, + {Key: "testTaint2", Value: "test2", Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &[]int64{100}[0]}, + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}), + expectPatch: true, + expectDelete: true, + }, + } + + for _, item := range testCases { + t.Run(item.description, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods}) + controller, _, nodeIndexer := setupNewController(ctx, fakeClientset) + nodeIndexer.Add(item.newNode) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + controller.NodeUpdated(item.oldNode, item.newNode) + + if item.additionalSleep > 0 { + time.Sleep(item.additionalSleep) + } + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + }) + } +} + +func TestUpdateNodeWithMultipleTaints(t *testing.T) { + taint1 := createNoExecuteTaint(1) + taint2 := createNoExecuteTaint(2) + + minute := int64(60) + pod := testutil.NewPod("pod1", "node1") + pod.Spec.Tolerations = []corev1.Toleration{ + {Key: taint1.Key, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoExecute}, + {Key: taint2.Key, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &minute}, + } + podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + + untaintedNode := testutil.NewNode("node1") + + doubleTaintedNode := testutil.NewNode("node1") + doubleTaintedNode.Spec.Taints = []corev1.Taint{taint1, taint2} + + singleTaintedNode := testutil.NewNode("node1") + singleTaintedNode.Spec.Taints = []corev1.Taint{taint1} + + ctx, cancel := context.WithCancel(context.TODO()) + fakeClientset := fake.NewSimpleClientset(pod) + controller, _, nodeIndexer := setupNewController(ctx, fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + + // no taint + nodeIndexer.Add(untaintedNode) + controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) + // verify pod is not queued for deletion + if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil { + t.Fatalf("pod queued for deletion with no taints") + } + + // no taint -> infinitely tolerated taint + nodeIndexer.Update(singleTaintedNode) + controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) + // verify pod is not queued for deletion + if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil { + t.Fatalf("pod queued for deletion with permanently tolerated taint") + } + + // infinitely tolerated taint -> temporarily tolerated taint + nodeIndexer.Update(doubleTaintedNode) + controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) + // verify pod is queued for deletion + if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) == nil { + t.Fatalf("pod not queued for deletion after addition of temporarily tolerated taint") + } + + // temporarily tolerated taint -> infinitely tolerated taint + nodeIndexer.Update(singleTaintedNode) + controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) + // verify pod is not queued for deletion + if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil { + t.Fatalf("pod queued for deletion after removal of temporarily tolerated taint") + } + + // verify pod is not deleted + for _, action := range fakeClientset.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + t.Error("Unexpected deletion") + } + } + cancel() +} + +func TestUpdateNodeWithMultiplePods(t *testing.T) { + testCases := []struct { + description string + pods []corev1.Pod + oldNode *corev1.Node + newNode *corev1.Node + expectedDeleteTimes durationSlice + }{ + { + description: "Pods with different toleration times are evicted appropriately", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + *addToleration(testutil.NewPod("pod2", "node1"), 1, 1), + *addToleration(testutil.NewPod("pod3", "node1"), 1, -1), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectedDeleteTimes: durationSlice{ + {[]string{"pod1"}, 0}, + {[]string{"pod2"}, time.Second}, + }, + }, + { + description: "Evict all pods not matching all taints instantly", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + *addToleration(testutil.NewPod("pod2", "node1"), 1, 1), + *addToleration(testutil.NewPod("pod3", "node1"), 1, -1), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}), + expectedDeleteTimes: durationSlice{ + {[]string{"pod1", "pod2", "pod3"}, 0}, + }, + }, + } + + for _, item := range testCases { + t.Run(item.description, func(t *testing.T) { + t.Logf("Starting testcase %q", item.description) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods}) + sort.Sort(item.expectedDeleteTimes) + controller, _, nodeIndexer := setupNewController(ctx, fakeClientset) + nodeIndexer.Add(item.newNode) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + controller.NodeUpdated(item.oldNode, item.newNode) + + startedAt := time.Now() + for i := range item.expectedDeleteTimes { + if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp { + // compute a grace duration to give controller time to process updates. Choose big + // enough intervals in the test cases above to avoid flakes. + var increment time.Duration + if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp { + increment = 500 * time.Millisecond + } else { + increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2)) + } + + sleepTime := item.expectedDeleteTimes[i].timestamp - time.Since(startedAt) + increment + if sleepTime < 0 { + sleepTime = 0 + } + t.Logf("Sleeping for %v", sleepTime) + time.Sleep(sleepTime) + } + + for delay, podName := range item.expectedDeleteTimes[i].names { + deleted := false + for _, action := range fakeClientset.Actions() { + deleteAction, ok := action.(clienttesting.DeleteActionImpl) + if !ok { + t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb()) + continue + } + if deleteAction.GetResource().Resource != "pods" { + continue + } + if podName == deleteAction.GetName() { + deleted = true + } + } + if !deleted { + t.Errorf("Failed to deleted pod %v after %v", podName, delay) + } + } + for _, action := range fakeClientset.Actions() { + deleteAction, ok := action.(clienttesting.DeleteActionImpl) + if !ok { + t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb()) + continue + } + if deleteAction.GetResource().Resource != "pods" { + continue + } + deletedPodName := deleteAction.GetName() + expected := false + for _, podName := range item.expectedDeleteTimes[i].names { + if podName == deletedPodName { + expected = true + } + } + if !expected { + t.Errorf("Pod %v was deleted even though it shouldn't have", deletedPodName) + } + } + fakeClientset.ClearActions() + } + }) + } +} + +func TestGetMinTolerationTime(t *testing.T) { + one := int64(1) + two := int64(2) + oneSec := 1 * time.Second + + tests := []struct { + tolerations []corev1.Toleration + expected time.Duration + }{ + { + tolerations: []corev1.Toleration{}, + expected: 0, + }, + { + tolerations: []corev1.Toleration{ + { + TolerationSeconds: nil, + }, + }, + expected: -1, + }, + { + tolerations: []corev1.Toleration{ + { + TolerationSeconds: &one, + }, + { + TolerationSeconds: &two, + }, + }, + expected: oneSec, + }, + + { + tolerations: []corev1.Toleration{ + { + TolerationSeconds: &one, + }, + { + TolerationSeconds: nil, + }, + }, + expected: oneSec, + }, + { + tolerations: []corev1.Toleration{ + { + TolerationSeconds: nil, + }, + { + TolerationSeconds: &one, + }, + }, + expected: oneSec, + }, + } + + for _, test := range tests { + got := getMinTolerationTime(test.tolerations) + if got != test.expected { + t.Errorf("Incorrect min toleration time: got %v, expected %v", got, test.expected) + } + } +} + +// TestEventualConsistency verifies if getPodsAssignedToNode returns incomplete data +// (e.g. due to watch latency), it will reconcile the remaining pods eventually. +// This scenario is partially covered by TestUpdatePods, but given this is an important +// property of TaintManager, it's better to have explicit test for this. +func TestEventualConsistency(t *testing.T) { + testCases := []struct { + description string + pods []corev1.Pod + prevPod *corev1.Pod + newPod *corev1.Pod + oldNode *corev1.Node + newNode *corev1.Node + expectPatch bool + expectDelete bool + }{ + { + description: "existing pod2 scheduled onto tainted Node", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: testutil.NewPod("pod2", ""), + newPod: testutil.NewPod("pod2", "node1"), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: true, + expectDelete: true, + }, + { + description: "existing pod2 with taint toleration scheduled onto tainted Node", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: addToleration(testutil.NewPod("pod2", ""), 1, 100), + newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: true, + expectDelete: true, + }, + { + description: "new pod2 created on tainted Node", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: nil, + newPod: testutil.NewPod("pod2", "node1"), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: true, + expectDelete: true, + }, + { + description: "new pod2 with tait toleration created on tainted Node", + pods: []corev1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: nil, + newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: true, + expectDelete: true, + }, + } + + for _, item := range testCases { + t.Run(item.description, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods}) + controller, podIndexer, nodeIndexer := setupNewController(ctx, fakeClientset) + nodeIndexer.Add(item.newNode) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + + if item.prevPod != nil { + podIndexer.Add(item.prevPod) + controller.PodUpdated(nil, item.prevPod) + } + + // First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet. + controller.NodeUpdated(item.oldNode, item.newNode) + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + fakeClientset.ClearActions() + + // And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well. + podIndexer.Update(item.newPod) + controller.PodUpdated(item.prevPod, item.newPod) + // wait a bit + time.Sleep(timeForControllerToProgressForSanityCheck) + }) + } +} + +func verifyPodActions(t *testing.T, description string, fakeClientset *fake.Clientset, expectPatch, expectDelete bool) { + t.Helper() + podPatched := false + podDeleted := false + // use Poll instead of PollImmediate to give some processing time to the controller that the expected + // actions are likely to be already sent + err := wait.Poll(10*time.Millisecond, 5*time.Second, func() (bool, error) { + for _, action := range fakeClientset.Actions() { + if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" { + podPatched = true + } + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + podDeleted = true + } + } + return podPatched == expectPatch && podDeleted == expectDelete, nil + }) + if err != nil { + t.Errorf("Failed waiting for the expected actions: %q", err) + } + if podPatched != expectPatch { + t.Errorf("[%v]Unexpected test result. Expected patch %v, got %v", description, expectPatch, podPatched) + } + if podDeleted != expectDelete { + t.Errorf("[%v]Unexpected test result. Expected delete %v, got %v", description, expectDelete, podDeleted) + } +} + +// TestPodDeletionEvent Verify that the output events are as expected +func TestPodDeletionEvent(t *testing.T) { + f := func(path cmp.Path) bool { + switch path.String() { + // These fields change at runtime, so ignore it + case "LastTimestamp", "FirstTimestamp", "ObjectMeta.Name": + return true + } + return false + } + + t.Run("emitPodDeletionEvent", func(t *testing.T) { + controller := &Controller{} + recorder := testutil.NewFakeRecorder() + controller.recorder = recorder + controller.emitPodDeletionEvent(types.NamespacedName{ + Name: "test", + Namespace: "test", + }) + want := []*corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + }, + InvolvedObject: corev1.ObjectReference{ + Kind: "Pod", + APIVersion: "v1", + Namespace: "test", + Name: "test", + }, + Reason: "TaintManagerEviction", + Type: "Normal", + Count: 1, + Message: "Marking for deletion Pod test/test", + Source: corev1.EventSource{Component: "nodeControllerTest"}, + }, + } + if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 { + t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff) + } + }) + + t.Run("emitCancelPodDeletionEvent", func(t *testing.T) { + controller := &Controller{} + recorder := testutil.NewFakeRecorder() + controller.recorder = recorder + controller.emitCancelPodDeletionEvent(types.NamespacedName{ + Name: "test", + Namespace: "test", + }) + want := []*corev1.Event{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test", + }, + InvolvedObject: corev1.ObjectReference{ + Kind: "Pod", + APIVersion: "v1", + Namespace: "test", + Name: "test", + }, + Reason: "TaintManagerEviction", + Type: "Normal", + Count: 1, + Message: "Cancelling deletion of Pod test/test", + Source: corev1.EventSource{Component: "nodeControllerTest"}, + }, + } + if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 { + t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff) + } + }) +}