diff --git a/pkg/api/v1/helpers.go b/pkg/api/v1/helpers.go index 466dd8fc9ea..2f5587e908a 100644 --- a/pkg/api/v1/helpers.go +++ b/pkg/api/v1/helpers.go @@ -403,6 +403,9 @@ func DeleteTaint(taints []Taint, taintToDelete *Taint) ([]Taint, bool) { // Returns true and list of Tolerations matching all Taints if all are tolerated, or false otherwise. func GetMatchingTolerations(taints []Taint, tolerations []Toleration) (bool, []Toleration) { + if len(taints) == 0 { + return true, []Toleration{} + } if len(tolerations) == 0 && len(taints) > 0 { return false, []Toleration{} } diff --git a/pkg/controller/node/cidr_allocator.go b/pkg/controller/node/cidr_allocator.go index 22d7e7d0efd..d2bc7f99860 100644 --- a/pkg/controller/node/cidr_allocator.go +++ b/pkg/controller/node/cidr_allocator.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api" @@ -79,6 +80,12 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cidrAllocator"}) eventBroadcaster.StartLogging(glog.Infof) + if client != nil { + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")}) + } else { + glog.Fatalf("kubeClient is nil when starting NodeController") + } ra := &rangeAllocator{ client: client, diff --git a/pkg/controller/node/cidr_allocator_test.go b/pkg/controller/node/cidr_allocator_test.go index 77fcf584bb2..0c685bba94a 100644 --- a/pkg/controller/node/cidr_allocator_test.go +++ b/pkg/controller/node/cidr_allocator_test.go @@ -142,6 +142,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) return } + rangeAllocator.recorder = testutil.NewFakeRecorder() if err = rangeAllocator.cidrs.occupy(cidr); err != nil { t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) } @@ -223,6 +224,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) return } + rangeAllocator.recorder = testutil.NewFakeRecorder() err = rangeAllocator.cidrs.occupy(cidr) if err != nil { t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) @@ -334,6 +336,7 @@ func TestReleaseCIDRSuccess(t *testing.T) { t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description) return } + rangeAllocator.recorder = testutil.NewFakeRecorder() err = rangeAllocator.cidrs.occupy(cidr) if err != nil { t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err) diff --git a/pkg/controller/node/controller_utils.go b/pkg/controller/node/controller_utils.go index 4724909ca9f..fdb52f07cba 100644 --- a/pkg/controller/node/controller_utils.go +++ b/pkg/controller/node/controller_utils.go @@ -26,8 +26,11 @@ import ( "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + + clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -261,7 +264,7 @@ func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.Nod } func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { - ref := &v1.ObjectReference{ + ref := &clientv1.ObjectReference{ Kind: "Node", Name: nodeName, UID: types.UID(nodeUID), @@ -272,7 +275,7 @@ func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype } func recordNodeStatusChange(recorder record.EventRecorder, node *v1.Node, new_status string) { - ref := &v1.ObjectReference{ + ref := &clientv1.ObjectReference{ Kind: "Node", Name: node.Name, UID: node.UID, diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 2a915d7c095..9ed022bcf87 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -198,7 +198,7 @@ func NewNodeController( glog.V(0).Infof("Sending events to api server.") eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")}) } else { - glog.V(0).Infof("No api server defined - no events will be sent to API server.") + glog.Fatalf("kubeClient is nil when starting NodeController") } if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 33333083c06..4c183fa603b 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -550,6 +550,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() for _, ds := range item.daemonSets { nodeController.daemonSetInformer.Informer().GetStore().Add(&ds) } @@ -694,6 +695,7 @@ func TestPodStatusChange(t *testing.T) { evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } @@ -1213,6 +1215,7 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { nodeController.enterPartialDisruptionFunc = func(nodeNum int) float32 { return testRateLimiterQPS } + nodeController.recorder = testutil.NewFakeRecorder() nodeController.enterFullDisruptionFunc = func(nodeNum int) float32 { return testRateLimiterQPS } @@ -1303,6 +1306,7 @@ func TestCloudProviderNoRateLimit(t *testing.T) { testNodeMonitorPeriod, nil, nil, 0, false) nodeController.cloud = &fakecloud.FakeCloud{} nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) } + nodeController.recorder = testutil.NewFakeRecorder() nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) { return false, nil } @@ -1570,6 +1574,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } @@ -1803,6 +1808,7 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) { testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) nodeController.now = func() metav1.Time { return fakeNow } + nodeController.recorder = testutil.NewFakeRecorder() if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/controller/node/taint_controller.go b/pkg/controller/node/taint_controller.go index b0714b8d80e..f81d1049053 100644 --- a/pkg/controller/node/taint_controller.go +++ b/pkg/controller/node/taint_controller.go @@ -30,6 +30,9 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + clientv1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "github.com/golang/glog" @@ -94,7 +97,9 @@ type podUpdateItem struct { // NoExecuteTaint manager listens to Taint/Toleration changes and is resposible for removing Pods // from Nodes tainted with NoExecute Taints. type NoExecuteTaintManager struct { - client clientset.Interface + client clientset.Interface + recorder record.EventRecorder + taintEvictionQueue *TimedWorkerQueue // keeps a map from nodeName to all noExecute taints on that Node taintedNodesLock sync.Mutex @@ -107,11 +112,14 @@ type NoExecuteTaintManager struct { podUpdateQueue workqueue.Interface } -func deletePodHandler(c clientset.Interface) func(args *WorkArgs) error { +func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error { return func(args *WorkArgs) error { ns := args.NamespacedName.Namespace name := args.NamespacedName.Name glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String()) + if emitEventFunc != nil { + emitEventFunc(args.NamespacedName) + } var err error for i := 0; i < retries; i++ { err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions{}) @@ -124,7 +132,7 @@ func deletePodHandler(c clientset.Interface) func(args *WorkArgs) error { } } -func getNonExecuteTaints(taints []v1.Taint) []v1.Taint { +func getNoExecuteTaints(taints []v1.Taint) []v1.Taint { result := []v1.Taint{} for i := range taints { if taints[i].Effect == v1.TaintEffectNoExecute { @@ -156,19 +164,24 @@ func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, er // Returns minimal toleration time from the given slice, or -1 if it's infinite. func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { minTolerationTime := int64(-1) + if len(tolerations) == 0 { + return 0 + } + if tolerations[0].TolerationSeconds != nil { + tolerationSeconds := *(tolerations[0].TolerationSeconds) + if tolerationSeconds <= 0 { + return 0 + } else { + minTolerationTime = tolerationSeconds + } + } for i := range tolerations { if tolerations[i].TolerationSeconds != nil { - if minTolerationTime < 0 { - minTolerationTime = *(tolerations[i].TolerationSeconds) - } else { - tolerationSeconds := *(tolerations[i].TolerationSeconds) - if tolerationSeconds < minTolerationTime { - if tolerationSeconds < 0 { - minTolerationTime = 0 - } else { - minTolerationTime = tolerationSeconds - } - } + tolerationSeconds := *(tolerations[i].TolerationSeconds) + if tolerationSeconds <= 0 { + return 0 + } else if tolerationSeconds < minTolerationTime { + minTolerationTime = tolerationSeconds } } } @@ -178,20 +191,34 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { // NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to // communicate with the API server. func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { - return &NoExecuteTaintManager{ - client: c, - taintEvictionQueue: CreateWorkerQueue(deletePodHandler(c)), - taintedNodes: make(map[string][]v1.Taint), - nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize), - podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize), + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"}) + eventBroadcaster.StartLogging(glog.Infof) + if c != nil { + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(c.Core().RESTClient()).Events("")}) + } else { + glog.Fatalf("kubeClient is nil when starting NodeController") + } + + tm := &NoExecuteTaintManager{ + client: c, + recorder: recorder, + taintedNodes: make(map[string][]v1.Taint), + nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize), + podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize), nodeUpdateQueue: workqueue.New(), podUpdateQueue: workqueue.New(), } + tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent)) + + return tm } // Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed. func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { + glog.V(0).Infof("Starting NoExecuteTaintManager") // Functions that are responsible for taking work items out of the workqueues and putting them // into channels. go func(stopCh <-chan struct{}) { @@ -294,7 +321,7 @@ func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) return } } - oldTaints = getNonExecuteTaints(oldTaints) + oldTaints = getNoExecuteTaints(oldTaints) newTaints := []v1.Taint{} if newNode != nil { @@ -304,7 +331,7 @@ func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) return } } - newTaints = getNonExecuteTaints(newTaints) + newTaints = getNoExecuteTaints(newTaints) if oldNode != nil && newNode != nil && api.Semantic.DeepEqual(oldTaints, newTaints) { return @@ -318,6 +345,12 @@ func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) tc.nodeUpdateQueue.Add(updateItemInterface(updateItem)) } +func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) { + if tc.taintEvictionQueue.CancelWork(nsName.String()) { + tc.emitCancelPodDeletionEvent(nsName) + } +} + func (tc *NoExecuteTaintManager) processPodOnNode( podNamespacedName types.NamespacedName, nodeName string, @@ -325,11 +358,14 @@ func (tc *NoExecuteTaintManager) processPodOnNode( taints []v1.Taint, now time.Time, ) { + if len(taints) == 0 { + tc.cancelWorkWithEvent(podNamespacedName) + } allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations) if !allTolerated { - glog.V(2).Infof("Not all taints are tolerated after upgrade for Pod %v on %v", podNamespacedName.String(), nodeName) + glog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName) // We're canceling scheduled work (if any), as we're going to delete the Pod right away. - tc.taintEvictionQueue.CancelWork(podNamespacedName.String()) + tc.cancelWorkWithEvent(podNamespacedName) tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now()) return } @@ -348,7 +384,7 @@ func (tc *NoExecuteTaintManager) processPodOnNode( if startTime.Add(minTolerationTime).Before(triggerTime) { return } else { - tc.taintEvictionQueue.CancelWork(podNamespacedName.String()) + tc.cancelWorkWithEvent(podNamespacedName) } } tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) @@ -359,14 +395,14 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) { if podUpdate.newPod == nil { pod := podUpdate.oldPod podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} - glog.V(4).Infof("Noticed pod deletion: %v", podNamespacedName.String()) - tc.taintEvictionQueue.CancelWork(podNamespacedName.String()) + glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) + tc.cancelWorkWithEvent(podNamespacedName) return } // Create or Update pod := podUpdate.newPod podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} - glog.V(4).Infof("Noticed pod update: %v", podNamespacedName.String()) + glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName) nodeName := pod.Spec.NodeName if nodeName == "" { return @@ -377,6 +413,8 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) { taints, ok := tc.taintedNodes[nodeName] return taints, ok }() + // It's possible that Node was deleted, or Taints were removed before, which triggered + // eviction cancelling if it was needed. if !ok { return } @@ -387,20 +425,25 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) { // Delete if nodeUpdate.newNode == nil { node := nodeUpdate.oldNode - glog.V(4).Infof("Noticed node deletion: %v", node.Name) + glog.V(4).Infof("Noticed node deletion: %#v", node.Name) tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() delete(tc.taintedNodes, node.Name) return } // Create or Update - glog.V(4).Infof("Noticed node update: %v", nodeUpdate) + glog.V(4).Infof("Noticed node update: %#v", nodeUpdate) node := nodeUpdate.newNode taints := nodeUpdate.newTaints func() { tc.taintedNodesLock.Lock() defer tc.taintedNodesLock.Unlock() - tc.taintedNodes[node.Name] = taints + glog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints) + if len(taints) == 0 { + delete(tc.taintedNodes, node.Name) + } else { + tc.taintedNodes[node.Name] = taints + } }() pods, err := getPodsAssignedToNode(tc.client, node.Name) if err != nil { @@ -410,10 +453,11 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) { if len(pods) == 0 { return } + // Short circuit, to make this controller a bit faster. if len(taints) == 0 { - glog.V(4).Infof("All taints were removed from the Node. Cancelling all evictions...") + glog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name) for i := range pods { - tc.taintEvictionQueue.CancelWork(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}.String()) + tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name}) } return } @@ -430,3 +474,27 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) { tc.processPodOnNode(podNamespacedName, node.Name, tolerations, taints, now) } } + +func (tc *NoExecuteTaintManager) emitPodDeletionEvent(nsName types.NamespacedName) { + if tc.recorder == nil { + return + } + ref := &clientv1.ObjectReference{ + Kind: "Pod", + Name: nsName.Name, + Namespace: nsName.Namespace, + } + tc.recorder.Eventf(ref, clientv1.EventTypeNormal, "TaintManagerEviction", "Marking for deletion Pod %s", nsName.String()) +} + +func (tc *NoExecuteTaintManager) emitCancelPodDeletionEvent(nsName types.NamespacedName) { + if tc.recorder == nil { + return + } + ref := &clientv1.ObjectReference{ + Kind: "Pod", + Name: nsName.Name, + Namespace: nsName.Namespace, + } + tc.recorder.Eventf(ref, clientv1.EventTypeNormal, "TaintManagerEviction", "Cancelling deletion of Pod %s", nsName.String()) +} diff --git a/pkg/controller/node/taint_controller_test.go b/pkg/controller/node/taint_controller_test.go index 789197311fc..51404a36413 100644 --- a/pkg/controller/node/taint_controller_test.go +++ b/pkg/controller/node/taint_controller_test.go @@ -19,6 +19,7 @@ package node import ( "encoding/json" "fmt" + "sort" "testing" "time" @@ -27,9 +28,94 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/controller/node/testutil" + "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clienttesting "k8s.io/client-go/testing" ) +func createNoExecuteTaint(index int) v1.Taint { + return v1.Taint{ + Key: "testTaint" + fmt.Sprintf("%v", index), + Value: "test" + fmt.Sprintf("%v", index), + Effect: v1.TaintEffectNoExecute, + TimeAdded: metav1.Now(), + } +} + +func addToleration(pod *v1.Pod, index int, duration int64) *v1.Pod { + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + if duration < 0 { + pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = ` + [ + { + "key": "testTaint` + fmt.Sprintf("%v", index) + `", + "value": "test` + fmt.Sprintf("%v", index) + `", + "effect": "` + string(v1.TaintEffectNoExecute) + `" + } + ]` + } else { + pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = ` + [ + { + "key": "testTaint` + fmt.Sprintf("%v", index) + `", + "value": "test` + fmt.Sprintf("%v", index) + `", + "effect": "` + string(v1.TaintEffectNoExecute) + `", + "tolerationSeconds": ` + fmt.Sprintf("%v", duration) + ` + } + ]` + } + return pod +} + +func addTaintsToNode(node *v1.Node, key, value string, indices []int) *v1.Node { + taints := []v1.Taint{} + for _, index := range indices { + taints = append(taints, createNoExecuteTaint(index)) + } + taintsData, err := json.Marshal(taints) + if err != nil { + panic(err) + } + + if node.Annotations == nil { + node.Annotations = make(map[string]string) + } + node.Annotations[v1.TaintsAnnotationKey] = string(taintsData) + return node +} + +type timestampedPod struct { + name string + timestamp time.Duration +} + +type durationSlice []timestampedPod + +func (a durationSlice) Len() int { return len(a) } +func (a durationSlice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a durationSlice) Less(i, j int) bool { return a[i].timestamp < a[j].timestamp } + +func TestFilterNoExecuteTaints(t *testing.T) { + taints := []v1.Taint{ + { + Key: "one", + Value: "one", + Effect: v1.TaintEffectNoExecute, + }, + { + Key: "two", + Value: "two", + Effect: v1.TaintEffectNoSchedule, + }, + } + taints = getNoExecuteTaints(taints) + if len(taints) != 1 || taints[0].Key != "one" { + t.Errorf("Filtering doesn't work. Got %v", taints) + } +} + func TestComputeTaintDifference(t *testing.T) { testCases := []struct { lhs []v1.Taint @@ -123,42 +209,6 @@ func TestComputeTaintDifference(t *testing.T) { } } -func createNoExecuteTaint(index int) v1.Taint { - return v1.Taint{ - Key: "testTaint" + fmt.Sprintf("%v", index), - Value: "test" + fmt.Sprintf("%v", index), - Effect: v1.TaintEffectNoExecute, - TimeAdded: metav1.Now(), - } -} - -func addToleration(pod *v1.Pod, index int, duration int64) *v1.Pod { - if pod.Annotations == nil { - pod.Annotations = map[string]string{} - } - if duration < 0 { - pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = ` - [ - { - "key": "testTaint` + fmt.Sprintf("%v", index) + `", - "value": "test` + fmt.Sprintf("%v", index) + `", - "effect": "` + string(v1.TaintEffectNoExecute) + `" - } - ]` - } else { - pod.Annotations["scheduler.alpha.kubernetes.io/tolerations"] = ` - [ - { - "key": "testTaint` + fmt.Sprintf("%v", index) + `", - "value": "test` + fmt.Sprintf("%v", index) + `", - "effect": "` + string(v1.TaintEffectNoExecute) + `", - "tolerationSeconds": ` + fmt.Sprintf("%v", duration) + ` - } - ]` - } - return pod -} - func TestCreatePod(t *testing.T) { testCases := []struct { description string @@ -216,6 +266,7 @@ func TestCreatePod(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() controller := NewNoExecuteTaintManager(fakeClientset) + controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = item.taintedNodes controller.PodUpdated(nil, item.pod) @@ -239,6 +290,7 @@ func TestDeletePod(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() controller := NewNoExecuteTaintManager(fakeClientset) + controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = map[string][]v1.Taint{ "node1": {createNoExecuteTaint(1)}, @@ -301,6 +353,7 @@ func TestUpdatePod(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() controller := NewNoExecuteTaintManager(fakeClientset) + controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = item.taintedNodes @@ -327,23 +380,6 @@ func TestUpdatePod(t *testing.T) { } } -func addTaintsToNode(node *v1.Node, key, value string, indices []int) *v1.Node { - taints := []v1.Taint{} - for _, index := range indices { - taints = append(taints, createNoExecuteTaint(index)) - } - taintsData, err := json.Marshal(taints) - if err != nil { - panic(err) - } - - if node.Annotations == nil { - node.Annotations = make(map[string]string) - } - node.Annotations[v1.TaintsAnnotationKey] = string(taintsData) - return node -} - func TestCreateNode(t *testing.T) { testCases := []struct { description string @@ -381,6 +417,7 @@ func TestCreateNode(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) controller := NewNoExecuteTaintManager(fakeClientset) + controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(nil, item.node) // wait a bit @@ -403,6 +440,7 @@ func TestDeleteNode(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() controller := NewNoExecuteTaintManager(fakeClientset) + controller.recorder = testutil.NewFakeRecorder() controller.taintedNodes = map[string][]v1.Taint{ "node1": {createNoExecuteTaint(1)}, } @@ -464,12 +502,57 @@ func TestUpdateNode(t *testing.T) { expectDelete: false, additionalSleep: 1500 * time.Millisecond, }, + { + description: "Pod with multiple tolerations are victed when first one runs out", + pods: []v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + "scheduler.alpha.kubernetes.io/tolerations": ` + [ + { + "key": "testTaint1", + "value": "test1", + "effect": "` + string(v1.TaintEffectNoExecute) + `", + "tolerationSeconds": ` + fmt.Sprintf("%v", 1) + ` + }, + { + "key": "testTaint2", + "value": "test2", + "effect": "` + string(v1.TaintEffectNoExecute) + `", + "tolerationSeconds": ` + fmt.Sprintf("%v", 100) + ` + } + ] + `, + }, + }, + Spec: v1.PodSpec{ + NodeName: "node1", + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}), + expectDelete: true, + additionalSleep: 1500 * time.Millisecond, + }, } for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) controller := NewNoExecuteTaintManager(fakeClientset) + controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(item.oldNode, item.newNode) // wait a bit @@ -490,3 +573,85 @@ func TestUpdateNode(t *testing.T) { close(stopCh) } } + +func TestUpdateNodeWithMultiplePods(t *testing.T) { + testCases := []struct { + description string + pods []v1.Pod + oldNode *v1.Node + newNode *v1.Node + expectedDeleteTimes durationSlice + }{ + { + description: "Pods with different toleration times are evicted appropriately", + pods: []v1.Pod{ + *testutil.NewPod("pod1", "node1"), + *addToleration(testutil.NewPod("pod2", "node1"), 1, 1), + *addToleration(testutil.NewPod("pod3", "node1"), 1, -1), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectedDeleteTimes: durationSlice{ + {"pod1", 0}, + {"pod2", time.Second}, + }, + }, + { + description: "Evict all pods not maching all taints instantly", + pods: []v1.Pod{ + *testutil.NewPod("pod1", "node1"), + *addToleration(testutil.NewPod("pod2", "node1"), 1, 1), + *addToleration(testutil.NewPod("pod3", "node1"), 1, -1), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1, 2}), + expectedDeleteTimes: durationSlice{ + {"pod1", 0}, + {"pod2", 0}, + }, + }, + } + + for _, item := range testCases { + stopCh := make(chan struct{}) + fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) + sort.Sort(item.expectedDeleteTimes) + controller := NewNoExecuteTaintManager(fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(stopCh) + controller.NodeUpdated(item.oldNode, item.newNode) + + sleptAlready := time.Duration(0) + for i := range item.expectedDeleteTimes { + var increment time.Duration + if i == 0 || item.expectedDeleteTimes[i-1].timestamp != item.expectedDeleteTimes[i].timestamp { + if i == len(item.expectedDeleteTimes)-1 || item.expectedDeleteTimes[i+1].timestamp == item.expectedDeleteTimes[i].timestamp { + increment = 200 * time.Millisecond + } else { + increment = ((item.expectedDeleteTimes[i+1].timestamp - item.expectedDeleteTimes[i].timestamp) / time.Duration(2)) + } + sleepTime := item.expectedDeleteTimes[i].timestamp - sleptAlready + increment + glog.Infof("Sleeping for %v", sleepTime) + time.Sleep(sleepTime) + sleptAlready = item.expectedDeleteTimes[i].timestamp + increment + } + + podDeleted := false + for _, action := range fakeClientset.Actions() { + deleteAction, ok := action.(clienttesting.DeleteActionImpl) + if !ok { + glog.Infof("Found not-delete action with verb %v. Ignoring.", action.GetVerb()) + continue + } + if deleteAction.GetResource().Resource == "pods" && deleteAction.GetName() == item.expectedDeleteTimes[i].name { + podDeleted = true + } + } + if !podDeleted { + t.Errorf("%v: Unexepected test result. Expected delete %v which didn't happen", item.description, item.expectedDeleteTimes[i].name) + } + } + + close(stopCh) + } +} diff --git a/pkg/controller/node/testutil/test_utils.go b/pkg/controller/node/testutil/test_utils.go index 3f7a809a932..12093235aad 100644 --- a/pkg/controller/node/testutil/test_utils.go +++ b/pkg/controller/node/testutil/test_utils.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" + clientv1 "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/util/clock" "k8s.io/kubernetes/pkg/api" @@ -36,6 +37,8 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" utilnode "k8s.io/kubernetes/pkg/util/node" + + "github.com/golang/glog" ) // FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It @@ -227,6 +230,7 @@ func (m *FakeNodeHandler) Patch(name string, pt types.PatchType, data []byte, su // FakeRecorder is used as a fake during testing. type FakeRecorder struct { + sync.Mutex source clientv1.EventSource Events []*clientv1.Event clock clock.Clock @@ -247,20 +251,21 @@ func (f *FakeRecorder) PastEventf(obj runtime.Object, timestamp metav1.Time, eve } func (f *FakeRecorder) generateEvent(obj runtime.Object, timestamp metav1.Time, eventtype, reason, message string) { - ref, err := v1.GetReference(api.Scheme, obj) + f.Lock() + defer f.Unlock() + ref, err := clientv1.GetReference(api.Scheme, obj) if err != nil { + glog.Errorf("Encoutered error while getting reference: %v", err) return } event := f.makeEvent(ref, eventtype, reason, message) event.Source = f.source if f.Events != nil { - fmt.Println("write event") f.Events = append(f.Events, event) } } -func (f *FakeRecorder) makeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *clientv1.Event { - fmt.Println("make event") +func (f *FakeRecorder) makeEvent(ref *clientv1.ObjectReference, eventtype, reason, message string) *clientv1.Event { t := metav1.Time{Time: f.clock.Now()} namespace := ref.Namespace if namespace == "" { diff --git a/pkg/controller/node/timed_workers.go b/pkg/controller/node/timed_workers.go index c9d0d30427c..906e6e09350 100644 --- a/pkg/controller/node/timed_workers.go +++ b/pkg/controller/node/timed_workers.go @@ -25,7 +25,7 @@ import ( "github.com/golang/glog" ) -// WorkArgs keeps arguments that will be passed to tha function executed by the worker. +// WorkArgs keeps arguments that will be passed to the function executed by the worker. type WorkArgs struct { NamespacedName types.NamespacedName } @@ -50,7 +50,7 @@ type TimedWorker struct { // CreateWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`. func CreateWorker(args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(args *WorkArgs) error) *TimedWorker { - delay := fireAt.Sub(time.Now()) + delay := fireAt.Sub(createdAt) if delay <= 0 { go f(args) return nil @@ -71,9 +71,10 @@ func (w *TimedWorker) Cancel() { } } -// TimedWorkerQueue keeps a set of TimedWorkers that still wait for execution. +// TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution. type TimedWorkerQueue struct { sync.Mutex + // map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker. workers map[string]*TimedWorker workFunc func(args *WorkArgs) error } @@ -93,6 +94,8 @@ func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs) q.Lock() defer q.Unlock() if err == nil { + // To avoid duplicated calls we keep the key in the queue, to prevent + // subsequent additions. q.workers[key] = nil } else { delete(q.workers, key) @@ -104,6 +107,7 @@ func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(args *WorkArgs) // AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`. func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt time.Time) { key := args.KeyFromWorkArgs() + glog.V(4).Infof("Adding TimedWorkerQueue item %v at %v to be fired at %v", key, createdAt, fireAt) q.Lock() defer q.Unlock() @@ -112,21 +116,24 @@ func (q *TimedWorkerQueue) AddWork(args *WorkArgs, createdAt time.Time, fireAt t return } worker := CreateWorker(args, createdAt, fireAt, q.getWrappedWorkerFunc(key)) - if worker == nil { - return - } q.workers[key] = worker } -// CancelWork removes scheduled function execution from the queue. -func (q *TimedWorkerQueue) CancelWork(key string) { +// CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled. +func (q *TimedWorkerQueue) CancelWork(key string) bool { + glog.V(4).Infof("Cancelling TimedWorkerQueue item %v at %v", key, time.Now()) q.Lock() defer q.Unlock() worker, found := q.workers[key] + result := false if found { - worker.Cancel() + if worker != nil { + result = true + worker.Cancel() + } delete(q.workers, key) } + return result } // GetWorkerUnsafe returns a TimedWorker corresponding to the given key. diff --git a/pkg/controller/node/timed_workers_test.go b/pkg/controller/node/timed_workers_test.go index 5231c4c83ce..dfc19a3dbd8 100644 --- a/pkg/controller/node/timed_workers_test.go +++ b/pkg/controller/node/timed_workers_test.go @@ -26,7 +26,7 @@ import ( func TestExecute(t *testing.T) { testVal := int32(0) wg := sync.WaitGroup{} - wg.Add(10) + wg.Add(5) queue := CreateWorkerQueue(func(args *WorkArgs) error { atomic.AddInt32(&testVal, 1) wg.Done() @@ -38,6 +38,7 @@ func TestExecute(t *testing.T) { queue.AddWork(NewWorkArgs("3", "3"), now, now) queue.AddWork(NewWorkArgs("4", "4"), now, now) queue.AddWork(NewWorkArgs("5", "5"), now, now) + // Adding the same thing second time should be no-op queue.AddWork(NewWorkArgs("1", "1"), now, now) queue.AddWork(NewWorkArgs("2", "2"), now, now) queue.AddWork(NewWorkArgs("3", "3"), now, now) @@ -45,8 +46,8 @@ func TestExecute(t *testing.T) { queue.AddWork(NewWorkArgs("5", "5"), now, now) wg.Wait() lastVal := atomic.LoadInt32(&testVal) - if lastVal != 10 { - t.Errorf("Espected testVal = 10, got %v", lastVal) + if lastVal != 5 { + t.Errorf("Espected testVal = 5, got %v", lastVal) } } @@ -88,7 +89,7 @@ func TestCancel(t *testing.T) { return nil }) now := time.Now() - then := now.Add(time.Second) + then := now.Add(100 * time.Millisecond) queue.AddWork(NewWorkArgs("1", "1"), now, then) queue.AddWork(NewWorkArgs("2", "2"), now, then) queue.AddWork(NewWorkArgs("3", "3"), now, then) @@ -118,7 +119,7 @@ func TestCancelAndReadd(t *testing.T) { return nil }) now := time.Now() - then := now.Add(time.Second) + then := now.Add(100 * time.Millisecond) queue.AddWork(NewWorkArgs("1", "1"), now, then) queue.AddWork(NewWorkArgs("2", "2"), now, then) queue.AddWork(NewWorkArgs("3", "3"), now, then) diff --git a/test/e2e/taints_test.go b/test/e2e/taints_test.go index 540ac2a1b4a..b9d84235bbc 100644 --- a/test/e2e/taints_test.go +++ b/test/e2e/taints_test.go @@ -128,7 +128,7 @@ func createPodForTaintsTest(hasToleration bool, tolerationSeconds int, podName, } } -// Creates and startes a controller (informer) that watches updates on a pod in given namespace with given name. It puts a new +// Creates and starts a controller (informer) that watches updates on a pod in given namespace with given name. It puts a new // struct into observedDeletion channel for every deletion it sees. func createTestController(cs clientset.Interface, observedDeletions chan struct{}, stopCh chan struct{}, podName, ns string) { _, controller := cache.NewInformer( diff --git a/test/utils/runners.go b/test/utils/runners.go index 96c437fa814..8eb8616416b 100644 --- a/test/utils/runners.go +++ b/test/utils/runners.go @@ -71,7 +71,6 @@ func WaitUntilPodIsScheduled(c clientset.Interface, name, namespace string, time } func RunPodAndGetNodeName(c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) { - retries := 5 name := pod.Name namespace := pod.Namespace var err error