Apply davidopps comments to TaintController PR

This commit is contained in:
gmarek 2017-02-13 11:48:34 +01:00
parent 65cfd86c89
commit de6c9bd535
13 changed files with 376 additions and 109 deletions

View File

@ -403,6 +403,9 @@ func DeleteTaint(taints []Taint, taintToDelete *Taint) ([]Taint, bool) {
// Returns true and list of Tolerations matching all Taints if all are tolerated, or false otherwise.
func GetMatchingTolerations(taints []Taint, tolerations []Toleration) (bool, []Toleration) {
if len(taints) == 0 {
return true, []Toleration{}
}
if len(tolerations) == 0 && len(taints) > 0 {
return false, []Toleration{}
}

View File

@ -26,6 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
@ -79,6 +80,12 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cidrAllocator"})
eventBroadcaster.StartLogging(glog.Infof)
if client != nil {
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
} else {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
ra := &rangeAllocator{
client: client,

View File

@ -142,6 +142,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
if err = rangeAllocator.cidrs.occupy(cidr); err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}
@ -223,6 +224,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
err = rangeAllocator.cidrs.occupy(cidr)
if err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
@ -334,6 +336,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return
}
rangeAllocator.recorder = testutil.NewFakeRecorder()
err = rangeAllocator.cidrs.occupy(cidr)
if err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)

View File

@ -26,8 +26,11 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
@ -261,7 +264,7 @@ func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.Nod
}
func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) {
ref := &v1.ObjectReference{
ref := &clientv1.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeUID),
@ -272,7 +275,7 @@ func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype
}
func recordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) {
ref := &v1.ObjectReference{
ref := &clientv1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: node.UID,

View File

@ -198,7 +198,7 @@ func NewNodeController(
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
} else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
glog.Fatalf("kubeClient is nil when starting NodeController")
}
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {

View File

@ -550,6 +550,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
for _, ds := range item.daemonSets {
nodeController.daemonSetInformer.Informer().GetStore().Add(&ds)
}
@ -694,6 +695,7 @@ func TestPodStatusChange(t *testing.T) {
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1213,6 +1215,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
nodeController.enterPartialDisruptionFunc = func(nodeNum int) float32 {
return testRateLimiterQPS
}
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.enterFullDisruptionFunc = func(nodeNum int) float32 {
return testRateLimiterQPS
}
@ -1303,6 +1306,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
return false, nil
}
@ -1570,6 +1574,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1803,6 +1808,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
nodeController.recorder = testutil.NewFakeRecorder()
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -30,6 +30,9 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"github.com/golang/glog"
@ -94,7 +97,9 @@ type podUpdateItem struct {
// NoExecuteTaint manager listens to Taint/Toleration changes and is resposible for removing Pods
// from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct {
client clientset.Interface
client clientset.Interface
recorder record.EventRecorder
taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLock sync.Mutex
@ -107,11 +112,14 @@ type NoExecuteTaintManager struct {
podUpdateQueue workqueue.Interface
}
func deletePodHandler(c clientset.Interface) func(args *WorkArgs) error {
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions{})
@ -124,7 +132,7 @@ func deletePodHandler(c clientset.Interface) func(args *WorkArgs) error {
}
}
func getNonExecuteTaints(taints []v1.Taint) []v1.Taint {
func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
result := []v1.Taint{}
for i := range taints {
if taints[i].Effect == v1.TaintEffectNoExecute {
@ -156,19 +164,24 @@ func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, er
// Returns minimal toleration time from the given slice, or -1 if it's infinite.
func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
minTolerationTime := int64(-1)
if len(tolerations) == 0 {
return 0
}
if tolerations[0].TolerationSeconds != nil {
tolerationSeconds := *(tolerations[0].TolerationSeconds)
if tolerationSeconds <= 0 {
return 0
} else {
minTolerationTime = tolerationSeconds
}
}
for i := range tolerations {
if tolerations[i].TolerationSeconds != nil {
if minTolerationTime < 0 {
minTolerationTime = *(tolerations[i].TolerationSeconds)
} else {
tolerationSeconds := *(tolerations[i].TolerationSeconds)
if tolerationSeconds < minTolerationTime {
if tolerationSeconds < 0 {
minTolerationTime = 0
} else {
minTolerationTime = tolerationSeconds
}
}
tolerationSeconds := *(tolerations[i].TolerationSeconds)
if tolerationSeconds <= 0 {
return 0
} else if tolerationSeconds < minTolerationTime {
minTolerationTime = tolerationSeconds
}
}
}
@ -178,20 +191,34 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
return &NoExecuteTaintManager{
client: c,
taintEvictionQueue: CreateWorkerQueue(deletePodHandler(c)),
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),
podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize),
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"})
eventBroadcaster.StartLogging(glog.Infof)
if c != nil {
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(c.Core().RESTClient()).Events("")})
} else {
glog.Fatalf("kubeClient is nil when starting NodeController")
}
tm := &NoExecuteTaintManager{
client: c,
recorder: recorder,
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),
podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize),
nodeUpdateQueue: workqueue.New(),
podUpdateQueue: workqueue.New(),
}
tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))
return tm
}
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
glog.V(0).Infof("Starting NoExecuteTaintManager")
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
@ -294,7 +321,7 @@ func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node)
return
}
}
oldTaints = getNonExecuteTaints(oldTaints)
oldTaints = getNoExecuteTaints(oldTaints)
newTaints := []v1.Taint{}
if newNode != nil {
@ -304,7 +331,7 @@ func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node)
return
}
}
newTaints = getNonExecuteTaints(newTaints)
newTaints = getNoExecuteTaints(newTaints)
if oldNode != nil && newNode != nil && api.Semantic.DeepEqual(oldTaints, newTaints) {
return
@ -318,6 +345,12 @@ func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node)
tc.nodeUpdateQueue.Add(updateItemInterface(updateItem))
}
func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) {
if tc.taintEvictionQueue.CancelWork(nsName.String()) {
tc.emitCancelPodDeletionEvent(nsName)
}
}
func (tc *NoExecuteTaintManager) processPodOnNode(
podNamespacedName types.NamespacedName,
nodeName string,
@ -325,11 +358,14 @@ func (tc *NoExecuteTaintManager) processPodOnNode(
taints []v1.Taint,
now time.Time,
) {
if len(taints) == 0 {
tc.cancelWorkWithEvent(podNamespacedName)
}
allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations)
if !allTolerated {
glog.V(2).Infof("Not all taints are tolerated after upgrade for Pod %v on %v", podNamespacedName.String(), nodeName)
glog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)
// We're canceling scheduled work (if any), as we're going to delete the Pod right away.
tc.taintEvictionQueue.CancelWork(podNamespacedName.String())
tc.cancelWorkWithEvent(podNamespacedName)
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())
return
}
@ -348,7 +384,7 @@ func (tc *NoExecuteTaintManager) processPodOnNode(
if startTime.Add(minTolerationTime).Before(triggerTime) {
return
} else {
tc.taintEvictionQueue.CancelWork(podNamespacedName.String())
tc.cancelWorkWithEvent(podNamespacedName)
}
}
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
@ -359,14 +395,14 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
if podUpdate.newPod == nil {
pod := podUpdate.oldPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof("Noticed pod deletion: %v", podNamespacedName.String())
tc.taintEvictionQueue.CancelWork(podNamespacedName.String())
glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
}
// Create or Update
pod := podUpdate.newPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof("Noticed pod update: %v", podNamespacedName.String())
glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
@ -377,6 +413,8 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
@ -387,20 +425,25 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
// Delete
if nodeUpdate.newNode == nil {
node := nodeUpdate.oldNode
glog.V(4).Infof("Noticed node deletion: %v", node.Name)
glog.V(4).Infof("Noticed node deletion: %#v", node.Name)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, node.Name)
return
}
// Create or Update
glog.V(4).Infof("Noticed node update: %v", nodeUpdate)
glog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
node := nodeUpdate.newNode
taints := nodeUpdate.newTaints
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
tc.taintedNodes[node.Name] = taints
glog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
pods, err := getPodsAssignedToNode(tc.client, node.Name)
if err != nil {
@ -410,10 +453,11 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
glog.V(4).Infof("All taints were removed from the Node. Cancelling all evictions...")
glog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
for i := range pods {
tc.taintEvictionQueue.CancelWork(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}.String())
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
@ -430,3 +474,27 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {
tc.processPodOnNode(podNamespacedName, node.Name, tolerations, taints, now)
}
}
func (tc *NoExecuteTaintManager) emitPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &clientv1.ObjectReference{
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, clientv1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String())
}
func (tc *NoExecuteTaintManager) emitCancelPodDeletionEvent(nsName types.NamespacedName) {
if tc.recorder == nil {
return
}
ref := &clientv1.ObjectReference{
Kind: "Pod",
Name: nsName.Name,
Namespace: nsName.Namespace,
}
tc.recorder.Eventf(ref, clientv1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String())
}

View File

@ -19,6 +19,7 @@ package node
import (
"encoding/json"
"fmt"
"sort"
"testing"
"time"
@ -27,9 +28,94 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/controller/node/testutil"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clienttesting "k8s.io/client-go/testing"
)
func createNoExecuteTaint(index int) v1.Taint {
return v1.Taint{
Key: "testTaint" + fmt.Sprintf("%v", index),
Value: "test" + fmt.Sprintf("%v", index),
Effect: v1.TaintEffectNoExecute,
TimeAdded: metav1.Now(),
}
}
func addToleration(pod *v1.Pod, index int, duration int64) *v1.Pod {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if duration < 0 {
pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = `
[
{
"key": "testTaint` + fmt.Sprintf("%v", index) + `",
"value": "test` + fmt.Sprintf("%v", index) + `",
"effect": "` + string(v1.TaintEffectNoExecute) + `"
}
]`
} else {
pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = `
[
{
"key": "testTaint` + fmt.Sprintf("%v", index) + `",
"value": "test` + fmt.Sprintf("%v", index) + `",
"effect": "` + string(v1.TaintEffectNoExecute) + `",
"tolerationSeconds": ` + fmt.Sprintf("%v", duration) + `
}
]`
}
return pod
}
func addTaintsToNode(node *v1.Node, key, value string, indices []int) *v1.Node {
taints := []v1.Taint{}
for _, index := range indices {
taints = append(taints, createNoExecuteTaint(index))
}
taintsData, err := json.Marshal(taints)
if err != nil {
panic(err)
}
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[v1.TaintsAnnotationKey] = string(taintsData)
return node
}
type timestampedPod struct {
name string
timestamp time.Duration
}
type durationSlice []timestampedPod
func (a durationSlice) Len() int { return len(a) }
func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a durationSlice) Less(i, j int) bool { return a[i].timestamp < a[j].timestamp }
func TestFilterNoExecuteTaints(t *testing.T) {
taints := []v1.Taint{
{
Key: "one",
Value: "one",
Effect: v1.TaintEffectNoExecute,
},
{
Key: "two",
Value: "two",
Effect: v1.TaintEffectNoSchedule,
},
}
taints = getNoExecuteTaints(taints)
if len(taints) != 1 || taints[0].Key != "one" {
t.Errorf("Filtering doesn't work. Got %v", taints)
}
}
func TestComputeTaintDifference(t *testing.T) {
testCases := []struct {
lhs []v1.Taint
@ -123,42 +209,6 @@ func TestComputeTaintDifference(t *testing.T) {
}
}
func createNoExecuteTaint(index int) v1.Taint {
return v1.Taint{
Key: "testTaint" + fmt.Sprintf("%v", index),
Value: "test" + fmt.Sprintf("%v", index),
Effect: v1.TaintEffectNoExecute,
TimeAdded: metav1.Now(),
}
}
func addToleration(pod *v1.Pod, index int, duration int64) *v1.Pod {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
if duration < 0 {
pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = `
[
{
"key": "testTaint` + fmt.Sprintf("%v", index) + `",
"value": "test` + fmt.Sprintf("%v", index) + `",
"effect": "` + string(v1.TaintEffectNoExecute) + `"
}
]`
} else {
pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = `
[
{
"key": "testTaint` + fmt.Sprintf("%v", index) + `",
"value": "test` + fmt.Sprintf("%v", index) + `",
"effect": "` + string(v1.TaintEffectNoExecute) + `",
"tolerationSeconds": ` + fmt.Sprintf("%v", duration) + `
}
]`
}
return pod
}
func TestCreatePod(t *testing.T) {
testCases := []struct {
description string
@ -216,6 +266,7 @@ func TestCreatePod(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes
controller.PodUpdated(nil, item.pod)
@ -239,6 +290,7 @@ func TestDeletePod(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
@ -301,6 +353,7 @@ func TestUpdatePod(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes
@ -327,23 +380,6 @@ func TestUpdatePod(t *testing.T) {
}
}
func addTaintsToNode(node *v1.Node, key, value string, indices []int) *v1.Node {
taints := []v1.Taint{}
for _, index := range indices {
taints = append(taints, createNoExecuteTaint(index))
}
taintsData, err := json.Marshal(taints)
if err != nil {
panic(err)
}
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[v1.TaintsAnnotationKey] = string(taintsData)
return node
}
func TestCreateNode(t *testing.T) {
testCases := []struct {
description string
@ -381,6 +417,7 @@ func TestCreateNode(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.NodeUpdated(nil, item.node)
// wait a bit
@ -403,6 +440,7 @@ func TestDeleteNode(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
}
@ -464,12 +502,57 @@ func TestUpdateNode(t *testing.T) {
expectDelete: false,
additionalSleep: 1500 * time.Millisecond,
},
{
description: "Pod with multiple tolerations are victed when first one runs out",
pods: []v1.Pod{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Annotations: map[string]string{
"scheduler.alpha.kubernetes.io/tolerations": `
[
{
"key": "testTaint1",
"value": "test1",
"effect": "` + string(v1.TaintEffectNoExecute) + `",
"tolerationSeconds": ` + fmt.Sprintf("%v", 1) + `
},
{
"key": "testTaint2",
"value": "test2",
"effect": "` + string(v1.TaintEffectNoExecute) + `",
"tolerationSeconds": ` + fmt.Sprintf("%v", 100) + `
}
]
`,
},
},
Spec: v1.PodSpec{
NodeName: "node1",
},
Status: v1.PodStatus{
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
},
},
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectDelete: true,
additionalSleep: 1500 * time.Millisecond,
},
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.NodeUpdated(item.oldNode, item.newNode)
// wait a bit
@ -490,3 +573,85 @@ func TestUpdateNode(t *testing.T) {
close(stopCh)
}
}
func TestUpdateNodeWithMultiplePods(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectedDeleteTimes durationSlice
}{
{
description: "Pods with different toleration times are evicted appropriately",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectedDeleteTimes: durationSlice{
{"pod1", 0},
{"pod2", time.Second},
},
},
{
description: "Evict all pods not maching all taints instantly",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
*addToleration(testutil.NewPod("pod2", "node1"), 1, 1),
*addToleration(testutil.NewPod("pod3", "node1"), 1, -1),
},
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}),
expectedDeleteTimes: durationSlice{
{"pod1", 0},
{"pod2", 0},
},
},
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
sort.Sort(item.expectedDeleteTimes)
controller := NewNoExecuteTaintManager(fakeClientset)
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.NodeUpdated(item.oldNode, item.newNode)
sleptAlready := time.Duration(0)
for i := range item.expectedDeleteTimes {
var increment time.Duration
if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp {
if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp {
increment = 200 * time.Millisecond
} else {
increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2))
}
sleepTime := item.expectedDeleteTimes[i].timestamp - sleptAlready + increment
glog.Infof("Sleeping for %v", sleepTime)
time.Sleep(sleepTime)
sleptAlready = item.expectedDeleteTimes[i].timestamp + increment
}
podDeleted := false
for _, action := range fakeClientset.Actions() {
deleteAction, ok := action.(clienttesting.DeleteActionImpl)
if !ok {
glog.Infof("Found not-delete action with verb %v. Ignoring.", action.GetVerb())
continue
}
if deleteAction.GetResource().Resource == "pods" && deleteAction.GetName() == item.expectedDeleteTimes[i].name {
podDeleted = true
}
}
if !podDeleted {
t.Errorf("%v: Unexepected test result. Expected delete %v which didn't happen", item.description, item.expectedDeleteTimes[i].name)
}
}
close(stopCh)
}
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/util/clock"
"k8s.io/kubernetes/pkg/api"
@ -36,6 +37,8 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
utilnode "k8s.io/kubernetes/pkg/util/node"
"github.com/golang/glog"
)
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
@ -227,6 +230,7 @@ func (m *FakeNodeHandler) Patch(name string, pt types.PatchType, data []byte, su
// FakeRecorder is used as a fake during testing.
type FakeRecorder struct {
sync.Mutex
source clientv1.EventSource
Events []*clientv1.Event
clock clock.Clock
@ -247,20 +251,21 @@ func (f *FakeRecorder) PastEventf(obj runtime.Object, timestamp metav1.Time, eve
}
func (f *FakeRecorder) generateEvent(obj runtime.Object, timestamp metav1.Time, eventtype, reason, message string) {
ref, err := v1.GetReference(api.Scheme, obj)
f.Lock()
defer f.Unlock()
ref, err := clientv1.GetReference(api.Scheme, obj)
if err != nil {
glog.Errorf("Encoutered error while getting reference: %v", err)
return
}
event := f.makeEvent(ref, eventtype, reason, message)
event.Source = f.source
if f.Events != nil {
fmt.Println("write event")
f.Events = append(f.Events, event)
}
}
func (f *FakeRecorder) makeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *clientv1.Event {
fmt.Println("make event")
func (f *FakeRecorder) makeEvent(ref *clientv1.ObjectReference, eventtype, reason, message string) *clientv1.Event {
t := metav1.Time{Time: f.clock.Now()}
namespace := ref.Namespace
if namespace == "" {

View File

@ -25,7 +25,7 @@ import (
"github.com/golang/glog"
)
// WorkArgs keeps arguments that will be passed to tha function executed by the worker.
// WorkArgs keeps arguments that will be passed to the function executed by the worker.
type WorkArgs struct {
NamespacedName types.NamespacedName
}
@ -50,7 +50,7 @@ type TimedWorker struct {
// CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker {
delay := fireAt.Sub(time.Now())
delay := fireAt.Sub(createdAt)
if delay <= 0 {
go f(args)
return nil
@ -71,9 +71,10 @@ func (w *TimedWorker) Cancel() {
}
}
// TimedWorkerQueue keeps a set of TimedWorkers that still wait for execution.
// TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution.
type TimedWorkerQueue struct {
sync.Mutex
// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
workers map[string]*TimedWorker
workFunc func(args *WorkArgs) error
}
@ -93,6 +94,8 @@ func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs)
q.Lock()
defer q.Unlock()
if err == nil {
// To avoid duplicated calls we keep the key in the queue, to prevent
// subsequent additions.
q.workers[key] = nil
} else {
delete(q.workers, key)
@ -104,6 +107,7 @@ func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs)
// AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) {
key := args.KeyFromWorkArgs()
glog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt)
q.Lock()
defer q.Unlock()
@ -112,21 +116,24 @@ func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt t
return
}
worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key))
if worker == nil {
return
}
q.workers[key] = worker
}
// CancelWork removes scheduled function execution from the queue.
func (q *TimedWorkerQueue) CancelWork(key string) {
// CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled.
func (q *TimedWorkerQueue) CancelWork(key string) bool {
glog.V(4).Infof("Cancelling TimedWorkerQueue item %v at %v", key, time.Now())
q.Lock()
defer q.Unlock()
worker, found := q.workers[key]
result := false
if found {
worker.Cancel()
if worker != nil {
result = true
worker.Cancel()
}
delete(q.workers, key)
}
return result
}
// GetWorkerUnsafe returns a TimedWorker corresponding to the given key.

View File

@ -26,7 +26,7 @@ import (
func TestExecute(t *testing.T) {
testVal := int32(0)
wg := sync.WaitGroup{}
wg.Add(10)
wg.Add(5)
queue := CreateWorkerQueue(func(args *WorkArgs) error {
atomic.AddInt32(&testVal, 1)
wg.Done()
@ -38,6 +38,7 @@ func TestExecute(t *testing.T) {
queue.AddWork(NewWorkArgs("3", "3"), now, now)
queue.AddWork(NewWorkArgs("4", "4"), now, now)
queue.AddWork(NewWorkArgs("5", "5"), now, now)
// Adding the same thing second time should be no-op
queue.AddWork(NewWorkArgs("1", "1"), now, now)
queue.AddWork(NewWorkArgs("2", "2"), now, now)
queue.AddWork(NewWorkArgs("3", "3"), now, now)
@ -45,8 +46,8 @@ func TestExecute(t *testing.T) {
queue.AddWork(NewWorkArgs("5", "5"), now, now)
wg.Wait()
lastVal := atomic.LoadInt32(&testVal)
if lastVal != 10 {
t.Errorf("Espected testVal = 10, got %v", lastVal)
if lastVal != 5 {
t.Errorf("Espected testVal = 5, got %v", lastVal)
}
}
@ -88,7 +89,7 @@ func TestCancel(t *testing.T) {
return nil
})
now := time.Now()
then := now.Add(time.Second)
then := now.Add(100 * time.Millisecond)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)
@ -118,7 +119,7 @@ func TestCancelAndReadd(t *testing.T) {
return nil
})
now := time.Now()
then := now.Add(time.Second)
then := now.Add(100 * time.Millisecond)
queue.AddWork(NewWorkArgs("1", "1"), now, then)
queue.AddWork(NewWorkArgs("2", "2"), now, then)
queue.AddWork(NewWorkArgs("3", "3"), now, then)

View File

@ -128,7 +128,7 @@ func createPodForTaintsTest(hasToleration bool, tolerationSeconds int, podName,
}
}
// Creates and startes a controller (informer) that watches updates on a pod in given namespace with given name. It puts a new
// Creates and starts a controller (informer) that watches updates on a pod in given namespace with given name. It puts a new
// struct into observedDeletion channel for every deletion it sees.
func createTestController(cs clientset.Interface, observedDeletions chan struct{}, stopCh chan struct{}, podName, ns string) {
_, controller := cache.NewInformer(

View File

@ -71,7 +71,6 @@ func WaitUntilPodIsScheduled(c clientset.Interface, name, namespace string, time
}
func RunPodAndGetNodeName(c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
retries := 5
name := pod.Name
namespace := pod.Namespace
var err error