DRA device taints: copy taintseviction controller

This is a verbatim copy of the current pkg/controller/taintseviction code,
revision fc268ecd09 (v1.33.0 plus one commit),
minus the TimedWorker helper.

The intent is to modify the code such that it enforces eviction of pods which
use tainted devices.
This commit is contained in:
Patrick Ohly 2025-02-05 15:07:58 +01:00
parent 6478ca5859
commit 13d04d4a92
5 changed files with 1642 additions and 0 deletions

View File

@ -0,0 +1,8 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-scheduling-maintainers
reviewers:
- sig-scheduling
labels:
- sig/scheduling

View File

@ -0,0 +1,19 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package tainteviction contains the logic implementing taint-based eviction
// for Pods running on Nodes with NoExecute taints.
package tainteviction

View File

@ -0,0 +1,60 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const taintEvictionControllerSubsystem = "taint_eviction_controller"
var (
// PodDeletionsTotal counts the number of Pods deleted by TaintEvictionController since its start.
PodDeletionsTotal = metrics.NewCounter(
&metrics.CounterOpts{
Subsystem: taintEvictionControllerSubsystem,
Name: "pod_deletions_total",
Help: "Total number of Pods deleted by TaintEvictionController since its start.",
StabilityLevel: metrics.ALPHA,
},
)
// PodDeletionsLatency tracks the latency, in seconds, between the time when a taint effect has been activated
// for the Pod and its deletion.
PodDeletionsLatency = metrics.NewHistogram(
&metrics.HistogramOpts{
Subsystem: taintEvictionControllerSubsystem,
Name: "pod_deletion_duration_seconds",
Help: "Latency, in seconds, between the time when a taint effect has been activated for the Pod and its deletion via TaintEvictionController.",
Buckets: []float64{0.005, 0.025, 0.1, 0.5, 1, 2.5, 10, 30, 60, 120, 180, 240}, // 5ms to 4m
StabilityLevel: metrics.ALPHA,
},
)
)
var registerMetrics sync.Once
// Register registers TaintEvictionController metrics.
func Register() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(PodDeletionsTotal)
legacyregistry.MustRegister(PodDeletionsLatency)
})
}

View File

@ -0,0 +1,614 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tainteviction
import (
"context"
"fmt"
"hash/fnv"
"io"
"math"
"sync"
"time"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
corev1informers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apipod "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/apis/core/helper"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/tainteviction/metrics"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
utilpod "k8s.io/kubernetes/pkg/util/pod"
)
const (
// TODO (k82cn): Figure out a reasonable number of workers/channels and propagate
// the number of workers up making it a parameter of Run() function.
// NodeUpdateChannelSize defines the size of channel for node update events.
NodeUpdateChannelSize = 10
// UpdateWorkerSize defines the size of workers for node update or/and pod update.
UpdateWorkerSize = 8
podUpdateChannelSize = 1
retries = 5
)
type nodeUpdateItem struct {
nodeName string
}
type podUpdateItem struct {
podName string
podNamespace string
nodeName string
}
func hash(val string, max int) int {
hasher := fnv.New32a()
io.WriteString(hasher, val)
return int(hasher.Sum32() % uint32(max))
}
// GetPodsByNodeNameFunc returns the list of pods assigned to the specified node.
type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error)
// Controller listens to Taint/Toleration changes and is responsible for removing Pods
// from Nodes tainted with NoExecute Taints.
type Controller struct {
name string
client clientset.Interface
broadcaster record.EventBroadcaster
recorder record.EventRecorder
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
getPodsAssignedToNode GetPodsByNodeNameFunc
taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLock sync.Mutex
taintedNodes map[string][]v1.Taint
nodeUpdateChannels []chan nodeUpdateItem
podUpdateChannels []chan podUpdateItem
nodeUpdateQueue workqueue.TypedInterface[nodeUpdateItem]
podUpdateQueue workqueue.TypedInterface[podUpdateItem]
}
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName), controllerName string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.FromContext(ctx).Info("Deleting pod", "controller", controllerName, "pod", args.NamespacedName)
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = addConditionAndDeletePod(ctx, c, name, ns)
if err == nil {
metrics.PodDeletionsTotal.Inc()
metrics.PodDeletionsLatency.Observe(float64(time.Since(fireAt) * time.Second))
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) {
pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return err
}
newStatus := pod.Status.DeepCopy()
updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{
Type: v1.DisruptionTarget,
Status: v1.ConditionTrue,
Reason: "DeletionByTaintManager",
Message: "Taint manager: deleting due to NoExecute taint",
})
if updated {
if _, _, _, err := utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil {
return err
}
}
return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
}
func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
result := []v1.Taint{}
for i := range taints {
if taints[i].Effect == v1.TaintEffectNoExecute {
result = append(result, taints[i])
}
}
return result
}
// getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite.
func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
minTolerationTime := int64(math.MaxInt64)
if len(tolerations) == 0 {
return 0
}
for i := range tolerations {
if tolerations[i].TolerationSeconds != nil {
tolerationSeconds := *(tolerations[i].TolerationSeconds)
if tolerationSeconds <= 0 {
return 0
} else if tolerationSeconds < minTolerationTime {
minTolerationTime = tolerationSeconds
}
}
}
if minTolerationTime == int64(math.MaxInt64) {
return -1
}
return time.Duration(minTolerationTime) * time.Second
}
// New creates a new Controller that will use passed clientset to communicate with the API server.
func New(ctx context.Context, c clientset.Interface, podInformer corev1informers.PodInformer, nodeInformer corev1informers.NodeInformer, controllerName string) (*Controller, error) {
logger := klog.FromContext(ctx)
metrics.Register()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
podIndexer := podInformer.Informer().GetIndexer()
tm := &Controller{
name: controllerName,
client: c,
broadcaster: eventBroadcaster,
recorder: recorder,
podLister: podInformer.Lister(),
podListerSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
getPodsAssignedToNode: func(nodeName string) ([]*v1.Pod, error) {
objs, err := podIndexer.ByIndex("spec.nodeName", nodeName)
if err != nil {
return nil, err
}
pods := make([]*v1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
pods = append(pods, pod)
}
return pods, nil
},
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateQueue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[nodeUpdateItem]{Name: "noexec_taint_node"}),
podUpdateQueue: workqueue.NewTypedWithConfig(workqueue.TypedQueueConfig[podUpdateItem]{Name: "noexec_taint_pod"}),
}
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent, tm.name))
_, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
tm.PodUpdated(nil, pod)
},
UpdateFunc: func(prev, obj interface{}) {
prevPod := prev.(*v1.Pod)
newPod := obj.(*v1.Pod)
tm.PodUpdated(prevPod, newPod)
},
DeleteFunc: func(obj interface{}) {
pod, isPod := obj.(*v1.Pod)
// We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
if !isPod {
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
logger.Error(nil, "Received unexpected object", "object", obj)
return
}
pod, ok = deletedState.Obj.(*v1.Pod)
if !ok {
logger.Error(nil, "DeletedFinalStateUnknown contained non-Pod object", "object", deletedState.Obj)
return
}
}
tm.PodUpdated(pod, nil)
},
})
if err != nil {
return nil, fmt.Errorf("unable to add pod event handler: %w", err)
}
_, err = nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
tm.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
tm.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
tm.NodeUpdated(node, nil)
return nil
}),
})
if err != nil {
return nil, fmt.Errorf("unable to add node event handler: %w", err)
}
return tm, nil
}
// Run starts the controller which will run in loop until `stopCh` is closed.
func (tc *Controller) Run(ctx context.Context) {
defer utilruntime.HandleCrash()
logger := klog.FromContext(ctx)
logger.Info("Starting", "controller", tc.name)
defer logger.Info("Shutting down controller", "controller", tc.name)
// Start events processing pipeline.
tc.broadcaster.StartStructuredLogging(3)
if tc.client != nil {
logger.Info("Sending events to api server")
tc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: tc.client.CoreV1().Events("")})
} else {
logger.Error(nil, "kubeClient is nil", "controller", tc.name)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
defer tc.broadcaster.Shutdown()
defer tc.nodeUpdateQueue.ShutDown()
defer tc.podUpdateQueue.ShutDown()
// wait for the cache to be synced
if !cache.WaitForNamedCacheSync(tc.name, ctx.Done(), tc.podListerSynced, tc.nodeListerSynced) {
return
}
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
nodeUpdate, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(nodeUpdate)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(ctx.Done())
go func(stopCh <-chan struct{}) {
for {
podUpdate, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
// The fact that pods are processed by the same worker as nodes is used to avoid races
// between node worker setting tc.taintedNodes and pod worker reading this to decide
// whether to delete pod.
// It's possible that even without this assumption this code is still correct.
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(podUpdate)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
}
}(ctx.Done())
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(ctx, i, wg.Done, ctx.Done())
}
wg.Wait()
}
func (tc *Controller) worker(ctx context.Context, worker int, done func(), stopCh <-chan struct{}) {
defer done()
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest the controller should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(ctx, nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(ctx, nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(ctx, podUpdate)
tc.podUpdateQueue.Done(podUpdate)
}
}
}
// PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
func (tc *Controller) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
podName := ""
podNamespace := ""
nodeName := ""
oldTolerations := []v1.Toleration{}
if oldPod != nil {
podName = oldPod.Name
podNamespace = oldPod.Namespace
nodeName = oldPod.Spec.NodeName
oldTolerations = oldPod.Spec.Tolerations
}
newTolerations := []v1.Toleration{}
if newPod != nil {
podName = newPod.Name
podNamespace = newPod.Namespace
nodeName = newPod.Spec.NodeName
newTolerations = newPod.Spec.Tolerations
}
if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
return
}
updateItem := podUpdateItem{
podName: podName,
podNamespace: podNamespace,
nodeName: nodeName,
}
tc.podUpdateQueue.Add(updateItem)
}
// NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
func (tc *Controller) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
nodeName := ""
oldTaints := []v1.Taint{}
if oldNode != nil {
nodeName = oldNode.Name
oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
}
newTaints := []v1.Taint{}
if newNode != nil {
nodeName = newNode.Name
newTaints = getNoExecuteTaints(newNode.Spec.Taints)
}
if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
return
}
updateItem := nodeUpdateItem{
nodeName: nodeName,
}
tc.nodeUpdateQueue.Add(updateItem)
}
func (tc *Controller) cancelWorkWithEvent(logger klog.Logger, nsName types.NamespacedName) {
if tc.taintEvictionQueue.CancelWork(logger, nsName.String()) {
tc.emitCancelPodDeletionEvent(nsName)
}
}
func (tc *Controller) processPodOnNode(
ctx context.Context,
podNamespacedName types.NamespacedName,
nodeName string,
tolerations []v1.Toleration,
taints []v1.Taint,
now time.Time,
) {
logger := klog.FromContext(ctx)
if len(taints) == 0 {
tc.cancelWorkWithEvent(logger, podNamespacedName)
}
allTolerated, usedTolerations := v1helper.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
logger.V(2).Info("Not all taints are tolerated after update for pod on node", "pod", podNamespacedName.String(), "node", klog.KRef("", nodeName))
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
tc.cancelWorkWithEvent(logger, podNamespacedName)
tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), now, now)
return
}
minTolerationTime := getMinTolerationTime(usedTolerations)
// getMinTolerationTime returns negative value to denote infinite toleration.
if minTolerationTime < 0 {
logger.V(4).Info("Current tolerations for pod tolerate forever, cancelling any scheduled deletion", "pod", podNamespacedName.String())
tc.cancelWorkWithEvent(logger, podNamespacedName)
return
}
startTime := now
triggerTime := startTime.Add(minTolerationTime)
scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())
if scheduledEviction != nil {
startTime = scheduledEviction.CreatedAt
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
}
tc.cancelWorkWithEvent(logger, podNamespacedName)
}
tc.taintEvictionQueue.AddWork(ctx, NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
}
func (tc *Controller) handlePodUpdate(ctx context.Context, podUpdate podUpdateItem) {
pod, err := tc.podLister.Pods(podUpdate.podNamespace).Get(podUpdate.podName)
logger := klog.FromContext(ctx)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
logger.V(4).Info("Noticed pod deletion", "pod", podNamespacedName)
tc.cancelWorkWithEvent(logger, podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
logger.V(4).Info("Noticed pod update", "pod", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
tc.processPodOnNode(ctx, podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
func (tc *Controller) handleNodeUpdate(ctx context.Context, nodeUpdate nodeUpdateItem) {
node, err := tc.nodeLister.Get(nodeUpdate.nodeName)
logger := klog.FromContext(ctx)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
logger.V(4).Info("Noticed node deletion", "node", klog.KRef("", nodeUpdate.nodeName))
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// Create or Update
logger.V(4).Info("Noticed node update", "node", klog.KObj(node))
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
logger.V(4).Info("Updating known taints on node", "node", klog.KObj(node), "taints", taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
pods, err := tc.getPodsAssignedToNode(node.Name)
if err != nil {
logger.Error(err, "Failed to get pods assigned to node", "node", klog.KObj(node))
return
}
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
logger.V(4).Info("All taints were removed from the node. Cancelling all evictions...", "node", klog.KObj(node))
for i := range pods {
tc.cancelWorkWithEvent(logger, types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
for _, pod := range pods {
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(ctx, podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
func (tc *Controller) emitPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
}
func (tc *Controller) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &v1.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, v1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
}

View File

@ -0,0 +1,941 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tainteviction
import (
"context"
"fmt"
goruntime "runtime"
"sort"
"testing"
"time"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller/testutil"
)
var timeForControllerToProgressForSanityCheck = 20 * time.Millisecond
func getPodsAssignedToNode(ctx context.Context, c *fake.Clientset) GetPodsByNodeNameFunc {
return func(nodeName string) ([]*corev1.Pod, error) {
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
pods, err := c.CoreV1().Pods(corev1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
return []*corev1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
}
rPods := make([]*corev1.Pod, len(pods.Items))
for i := range pods.Items {
rPods[i] = &pods.Items[i]
}
return rPods, nil
}
}
func createNoExecuteTaint(index int) corev1.Taint {
now := metav1.Now()
return corev1.Taint{
Key: "testTaint" + fmt.Sprintf("%v", index),
Value: "test" + fmt.Sprintf("%v", index),
Effect: corev1.TaintEffectNoExecute,
TimeAdded: &now,
}
}
func addToleration(pod *corev1.Pod, index int, duration int64) *corev1.Pod {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if duration < 0 {
pod.Spec.Tolerations = []corev1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: corev1.TaintEffectNoExecute}}
} else {
pod.Spec.Tolerations = []corev1.Toleration{{Key: "testTaint" + fmt.Sprintf("%v", index), Value: "test" + fmt.Sprintf("%v", index), Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &duration}}
}
return pod
}
func addTaintsToNode(node *corev1.Node, key, value string, indices []int) *corev1.Node {
taints := []corev1.Taint{}
for _, index := range indices {
taints = append(taints, createNoExecuteTaint(index))
}
node.Spec.Taints = taints
return node
}
var alwaysReady = func() bool { return true }
func setupNewController(ctx context.Context, fakeClientSet *fake.Clientset) (*Controller, cache.Indexer, cache.Indexer) {
informerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0)
podIndexer := informerFactory.Core().V1().Pods().Informer().GetIndexer()
nodeIndexer := informerFactory.Core().V1().Nodes().Informer().GetIndexer()
mgr, _ := New(ctx, fakeClientSet, informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Nodes(), "taint-eviction-controller")
mgr.podListerSynced = alwaysReady
mgr.nodeListerSynced = alwaysReady
mgr.getPodsAssignedToNode = getPodsAssignedToNode(ctx, fakeClientSet)
return mgr, podIndexer, nodeIndexer
}
type timestampedPod struct {
names []string
timestamp time.Duration
}
type durationSlice []timestampedPod
func (a durationSlice) Len() int { return len(a) }
func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a durationSlice) Less(i, j int) bool { return a[i].timestamp < a[j].timestamp }
func TestFilterNoExecuteTaints(t *testing.T) {
taints := []corev1.Taint{
{
Key: "one",
Value: "one",
Effect: corev1.TaintEffectNoExecute,
},
{
Key: "two",
Value: "two",
Effect: corev1.TaintEffectNoSchedule,
},
}
taints = getNoExecuteTaints(taints)
if len(taints) != 1 || taints[0].Key != "one" {
t.Errorf("Filtering doesn't work. Got %v", taints)
}
}
func TestCreatePod(t *testing.T) {
testCases := []struct {
description string
pod *corev1.Pod
taintedNodes map[string][]corev1.Taint
expectPatch bool
expectDelete bool
}{
{
description: "not scheduled - ignore",
pod: testutil.NewPod("pod1", ""),
taintedNodes: map[string][]corev1.Taint{},
expectDelete: false,
},
{
description: "scheduled on untainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]corev1.Taint{},
expectDelete: false,
},
{
description: "schedule on tainted Node",
pod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
{
description: "schedule on tainted Node with finite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "schedule on tainted Node with infinite invalid toleration",
pod: addToleration(testutil.NewPod("pod1", "node1"), 2, -1),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: []corev1.Pod{*item.pod}})
controller, podIndexer, _ := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = item.taintedNodes
podIndexer.Add(item.pod)
controller.PodUpdated(nil, item.pod)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
func TestDeletePod(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.taintedNodes = map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
}
controller.PodUpdated(testutil.NewPod("pod1", "node1"), nil)
// wait a bit to see if nothing will panic
time.Sleep(timeForControllerToProgressForSanityCheck)
}
func TestUpdatePod(t *testing.T) {
testCases := []struct {
description string
prevPod *corev1.Pod
awaitForScheduledEviction bool
newPod *corev1.Pod
taintedNodes map[string][]corev1.Taint
expectPatch bool
expectDelete bool
skipOnWindows bool
}{
{
description: "scheduling onto tainted Node",
prevPod: testutil.NewPod("pod1", ""),
newPod: testutil.NewPod("pod1", "node1"),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
{
description: "scheduling onto tainted Node with toleration",
prevPod: addToleration(testutil.NewPod("pod1", ""), 1, -1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectDelete: false,
},
{
description: "removing toleration",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
newPod: testutil.NewPod("pod1", "node1"),
awaitForScheduledEviction: true,
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
},
{
description: "lengthening toleration shouldn't work",
prevPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
newPod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
awaitForScheduledEviction: true,
taintedNodes: map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
},
expectPatch: true,
expectDelete: true,
skipOnWindows: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
if item.skipOnWindows && goruntime.GOOS == "windows" {
// TODO: remove skip once the flaking test has been fixed.
t.Skip("Skip flaking test on Windows.")
}
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: []corev1.Pod{*item.prevPod}})
controller, podIndexer, _ := setupNewController(context.TODO(), fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = item.taintedNodes
go controller.Run(ctx)
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
if item.awaitForScheduledEviction {
nsName := types.NamespacedName{Namespace: item.prevPod.Namespace, Name: item.prevPod.Name}
err := wait.PollImmediate(time.Millisecond*10, time.Second, func() (bool, error) {
scheduledEviction := controller.taintEvictionQueue.GetWorkerUnsafe(nsName.String())
return scheduledEviction != nil, nil
})
if err != nil {
t.Fatalf("Failed to await for scheduled eviction: %q", err)
}
}
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
})
}
}
func TestCreateNode(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
node *corev1.Node
expectPatch bool
expectDelete bool
}{
{
description: "Creating Node matching already assigned Pod",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: testutil.NewNode("node1"),
expectPatch: false,
expectDelete: false,
},
{
description: "Creating tainted Node matching already assigned Pod",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "Creating tainted Node matching already assigned tolerating Pod",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, -1),
},
node: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: false,
expectDelete: false,
},
}
for _, item := range testCases {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.node)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(nil, item.node)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
cancel()
}
}
func TestDeleteNode(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fakeClientset := fake.NewSimpleClientset()
controller, _, _ := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = map[string][]corev1.Taint{
"node1": {createNoExecuteTaint(1)},
}
go controller.Run(ctx)
controller.NodeUpdated(testutil.NewNode("node1"), nil)
// await until controller.taintedNodes is empty
err := wait.PollImmediate(10*time.Millisecond, time.Second, func() (bool, error) {
controller.taintedNodesLock.Lock()
defer controller.taintedNodesLock.Unlock()
_, ok := controller.taintedNodes["node1"]
return !ok, nil
})
if err != nil {
t.Errorf("Failed to await for processing node deleted: %q", err)
}
cancel()
}
func TestUpdateNode(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
oldNode *corev1.Node
newNode *corev1.Node
expectPatch bool
expectDelete bool
additionalSleep time.Duration
}{
{
description: "Added taint, expect node patched and deleted",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "Added tolerated taint",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
},
{
description: "Only one added taint tolerated",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 100),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectPatch: true,
expectDelete: true,
},
{
description: "Taint removed",
pods: []corev1.Pod{
*addToleration(testutil.NewPod("pod1", "node1"), 1, 1),
},
oldNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
newNode: testutil.NewNode("node1"),
expectDelete: false,
additionalSleep: 1500 * time.Millisecond,
},
{
description: "Pod with multiple tolerations are evicted when first one runs out",
pods: []corev1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
},
Spec: corev1.PodSpec{
NodeName: "node1",
Tolerations: []corev1.Toleration{
{Key: "testTaint1", Value: "test1", Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &[]int64{1}[0]},
{Key: "testTaint2", Value: "test2", Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &[]int64{100}[0]},
},
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
},
},
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectPatch: true,
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(item.oldNode, item.newNode)
if item.additionalSleep > 0 {
time.Sleep(item.additionalSleep)
}
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
})
}
}
func TestUpdateNodeWithMultipleTaints(t *testing.T) {
taint1 := createNoExecuteTaint(1)
taint2 := createNoExecuteTaint(2)
minute := int64(60)
pod := testutil.NewPod("pod1", "node1")
pod.Spec.Tolerations = []corev1.Toleration{
{Key: taint1.Key, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoExecute},
{Key: taint2.Key, Operator: corev1.TolerationOpExists, Effect: corev1.TaintEffectNoExecute, TolerationSeconds: &minute},
}
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
untaintedNode := testutil.NewNode("node1")
doubleTaintedNode := testutil.NewNode("node1")
doubleTaintedNode.Spec.Taints = []corev1.Taint{taint1, taint2}
singleTaintedNode := testutil.NewNode("node1")
singleTaintedNode.Spec.Taints = []corev1.Taint{taint1}
ctx, cancel := context.WithCancel(context.TODO())
fakeClientset := fake.NewSimpleClientset(pod)
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
// no taint
nodeIndexer.Add(untaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion with no taints")
}
// no taint -> infinitely tolerated taint
nodeIndexer.Update(singleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion with permanently tolerated taint")
}
// infinitely tolerated taint -> temporarily tolerated taint
nodeIndexer.Update(doubleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) == nil {
t.Fatalf("pod not queued for deletion after addition of temporarily tolerated taint")
}
// temporarily tolerated taint -> infinitely tolerated taint
nodeIndexer.Update(singleTaintedNode)
controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"})
// verify pod is not queued for deletion
if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil {
t.Fatalf("pod queued for deletion after removal of temporarily tolerated taint")
}
// verify pod is not deleted
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
t.Error("Unexpected deletion")
}
}
cancel()
}
func TestUpdateNodeWithMultiplePods(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
oldNode *corev1.Node
newNode *corev1.Node
expectedDeleteTimes durationSlice
}{
{
description: "Pods with different toleration times are evicted appropriately",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectedDeleteTimes: durationSlice{
{[]string{"pod1"}, 0},
{[]string{"pod2"}, time.Second},
},
},
{
description: "Evict all pods not matching all taints instantly",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectedDeleteTimes: durationSlice{
{[]string{"pod1", "pod2", "pod3"}, 0},
},
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
t.Logf("Starting testcase %q", item.description)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
sort.Sort(item.expectedDeleteTimes)
controller, _, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
controller.NodeUpdated(item.oldNode, item.newNode)
startedAt := time.Now()
for i := range item.expectedDeleteTimes {
if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp {
// compute a grace duration to give controller time to process updates. Choose big
// enough intervals in the test cases above to avoid flakes.
var increment time.Duration
if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp {
increment = 500 * time.Millisecond
} else {
increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2))
}
sleepTime := item.expectedDeleteTimes[i].timestamp - time.Since(startedAt) + increment
if sleepTime < 0 {
sleepTime = 0
}
t.Logf("Sleeping for %v", sleepTime)
time.Sleep(sleepTime)
}
for delay, podName := range item.expectedDeleteTimes[i].names {
deleted := false
for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
if podName == deleteAction.GetName() {
deleted = true
}
}
if !deleted {
t.Errorf("Failed to deleted pod %v after %v", podName, delay)
}
}
for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
t.Logf("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource != "pods" {
continue
}
deletedPodName := deleteAction.GetName()
expected := false
for _, podName := range item.expectedDeleteTimes[i].names {
if podName == deletedPodName {
expected = true
}
}
if !expected {
t.Errorf("Pod %v was deleted even though it shouldn't have", deletedPodName)
}
}
fakeClientset.ClearActions()
}
})
}
}
func TestGetMinTolerationTime(t *testing.T) {
one := int64(1)
two := int64(2)
oneSec := 1 * time.Second
tests := []struct {
tolerations []corev1.Toleration
expected time.Duration
}{
{
tolerations: []corev1.Toleration{},
expected: 0,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: nil,
},
},
expected: -1,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: &one,
},
{
TolerationSeconds: &two,
},
},
expected: oneSec,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: &one,
},
{
TolerationSeconds: nil,
},
},
expected: oneSec,
},
{
tolerations: []corev1.Toleration{
{
TolerationSeconds: nil,
},
{
TolerationSeconds: &one,
},
},
expected: oneSec,
},
}
for _, test := range tests {
got := getMinTolerationTime(test.tolerations)
if got != test.expected {
t.Errorf("Incorrect min toleration time: got %v, expected %v", got, test.expected)
}
}
}
// TestEventualConsistency verifies if getPodsAssignedToNode returns incomplete data
// (e.g. due to watch latency), it will reconcile the remaining pods eventually.
// This scenario is partially covered by TestUpdatePods, but given this is an important
// property of TaintManager, it's better to have explicit test for this.
func TestEventualConsistency(t *testing.T) {
testCases := []struct {
description string
pods []corev1.Pod
prevPod *corev1.Pod
newPod *corev1.Pod
oldNode *corev1.Node
newNode *corev1.Node
expectPatch bool
expectDelete bool
}{
{
description: "existing pod2 scheduled onto tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: testutil.NewPod("pod2", ""),
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "existing pod2 with taint toleration scheduled onto tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: addToleration(testutil.NewPod("pod2", ""), 1, 100),
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "new pod2 created on tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
{
description: "new pod2 with tait toleration created on tainted Node",
pods: []corev1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectPatch: true,
expectDelete: true,
},
}
for _, item := range testCases {
t.Run(item.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fakeClientset := fake.NewSimpleClientset(&corev1.PodList{Items: item.pods})
controller, podIndexer, nodeIndexer := setupNewController(ctx, fakeClientset)
nodeIndexer.Add(item.newNode)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(ctx)
if item.prevPod != nil {
podIndexer.Add(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
}
// First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet.
controller.NodeUpdated(item.oldNode, item.newNode)
verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete)
fakeClientset.ClearActions()
// And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well.
podIndexer.Update(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgressForSanityCheck)
})
}
}
func verifyPodActions(t *testing.T, description string, fakeClientset *fake.Clientset, expectPatch, expectDelete bool) {
t.Helper()
podPatched := false
podDeleted := false
// use Poll instead of PollImmediate to give some processing time to the controller that the expected
// actions are likely to be already sent
err := wait.Poll(10*time.Millisecond, 5*time.Second, func() (bool, error) {
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" {
podPatched = true
}
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
return podPatched == expectPatch && podDeleted == expectDelete, nil
})
if err != nil {
t.Errorf("Failed waiting for the expected actions: %q", err)
}
if podPatched != expectPatch {
t.Errorf("[%v]Unexpected test result. Expected patch %v, got %v", description, expectPatch, podPatched)
}
if podDeleted != expectDelete {
t.Errorf("[%v]Unexpected test result. Expected delete %v, got %v", description, expectDelete, podDeleted)
}
}
// TestPodDeletionEvent Verify that the output events are as expected
func TestPodDeletionEvent(t *testing.T) {
f := func(path cmp.Path) bool {
switch path.String() {
// These fields change at runtime, so ignore it
case "LastTimestamp", "FirstTimestamp", "ObjectMeta.Name":
return true
}
return false
}
t.Run("emitPodDeletionEvent", func(t *testing.T) {
controller := &Controller{}
recorder := testutil.NewFakeRecorder()
controller.recorder = recorder
controller.emitPodDeletionEvent(types.NamespacedName{
Name: "test",
Namespace: "test",
})
want := []*corev1.Event{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
InvolvedObject: corev1.ObjectReference{
Kind: "Pod",
APIVersion: "v1",
Namespace: "test",
Name: "test",
},
Reason: "TaintManagerEviction",
Type: "Normal",
Count: 1,
Message: "Marking for deletion Pod test/test",
Source: corev1.EventSource{Component: "nodeControllerTest"},
},
}
if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 {
t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff)
}
})
t.Run("emitCancelPodDeletionEvent", func(t *testing.T) {
controller := &Controller{}
recorder := testutil.NewFakeRecorder()
controller.recorder = recorder
controller.emitCancelPodDeletionEvent(types.NamespacedName{
Name: "test",
Namespace: "test",
})
want := []*corev1.Event{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
},
InvolvedObject: corev1.ObjectReference{
Kind: "Pod",
APIVersion: "v1",
Namespace: "test",
Name: "test",
},
Reason: "TaintManagerEviction",
Type: "Normal",
Count: 1,
Message: "Cancelling deletion of Pod test/test",
Source: corev1.EventSource{Component: "nodeControllerTest"},
},
}
if diff := cmp.Diff(want, recorder.Events, cmp.FilterPath(f, cmp.Ignore())); len(diff) > 0 {
t.Errorf("emitPodDeletionEvent() returned data (-want,+got):\n%s", diff)
}
})
}