diff --git a/pkg/controller/node/controller_utils.go b/pkg/controller/node/controller_utils.go index 19abd03715c..5af5c866bbb 100644 --- a/pkg/controller/node/controller_utils.go +++ b/pkg/controller/node/controller_utils.go @@ -83,7 +83,7 @@ func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeleteP // deletePods will delete all pods from master running on given node, and return true // if any pods were deleted. -func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, daemonStore cache.StoreToDaemonSetLister) (bool, error) { +func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore cache.StoreToDaemonSetLister) (bool, error) { remaining := false selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) options := api.ListOptions{FieldSelector: selector} @@ -93,7 +93,7 @@ func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n } if len(pods.Items) > 0 { - recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) + recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) } for _, pod := range pods.Items { @@ -257,11 +257,11 @@ func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) ( return true, nil } -func recordNodeEvent(recorder record.EventRecorder, nodeName, eventtype, reason, event string) { +func recordNodeEvent(recorder record.EventRecorder, nodeName, nodeUID, eventtype, reason, event string) { ref := &api.ObjectReference{ Kind: "Node", Name: nodeName, - UID: types.UID(nodeName), + UID: types.UID(nodeUID), Namespace: "", } glog.V(2).Infof("Recording %s event message for node %s", event, nodeName) @@ -272,7 +272,7 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_s ref := &api.ObjectReference{ Kind: "Node", Name: node.Name, - UID: types.UID(node.Name), + UID: node.UID, Namespace: "", } glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) @@ -284,7 +284,7 @@ func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_s // terminatePods will ensure all pods on the given node that are in terminating state are eventually // cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how // long before we should check again (the next deadline for a pod to complete), or an error. -func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) { +func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, nodeUID string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) { // the time before we should try again nextAttempt := time.Duration(0) // have we deleted all pods @@ -320,7 +320,7 @@ func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder if remaining < 0 { remaining = 0 glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace) - recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName)) + recordNodeEvent(recorder, nodeName, nodeUID, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName)) if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil { glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err) complete = false diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 77aa730ef76..656e1742f02 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -386,14 +386,15 @@ func (nc *NodeController) Run(period time.Duration) { defer nc.evictorLock.Unlock() for k := range nc.zonePodEvictor { nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) { - remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore) + nodeUid, _ := value.UID.(string) + remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 } if remaining { - nc.zoneTerminationEvictor[k].Add(value.Value) + nc.zoneTerminationEvictor[k].Add(value.Value, value.UID) } return true, 0 }) @@ -407,7 +408,8 @@ func (nc *NodeController) Run(period time.Duration) { defer nc.evictorLock.Unlock() for k := range nc.zoneTerminationEvictor { nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) { - completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod) + nodeUid, _ := value.UID.(string) + completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, value.AddedAt, nc.maximumGracePeriod) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) return false, 0 @@ -415,7 +417,7 @@ func (nc *NodeController) Run(period time.Duration) { if completed { glog.V(2).Infof("All pods terminated on %s", value.Value) - recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) + recordNodeEvent(nc.recorder, value.Value, nodeUid, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) return true, 0 } @@ -450,7 +452,7 @@ func (nc *NodeController) monitorNodeStatus() error { added, deleted := nc.checkForNodeAddedDeleted(nodes) for i := range added { glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name) - recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) + recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) nc.knownNodeSet[added[i].Name] = added[i] // When adding new Nodes we need to check if new zone appeared, and if so add new evictor. zone := utilnode.GetZoneKey(added[i]) @@ -468,7 +470,7 @@ func (nc *NodeController) monitorNodeStatus() error { for i := range deleted { glog.V(1).Infof("NodeController observed a Node deletion: %v", deleted[i].Name) - recordNodeEvent(nc.recorder, deleted[i].Name, api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name)) + recordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), api.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", deleted[i].Name)) nc.evictPods(deleted[i]) delete(nc.knownNodeSet, deleted[i].Name) } @@ -540,7 +542,7 @@ func (nc *NodeController) monitorNodeStatus() error { } if !exists { glog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name) - recordNodeEvent(nc.recorder, node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) + recordNodeEvent(nc.recorder, node.Name, string(node.UID), api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) go func(nodeName string) { defer utilruntime.HandleCrash() // Kubelet is not reporting and Cloud Provider says node @@ -867,5 +869,5 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool { func (nc *NodeController) evictPods(node *api.Node) bool { nc.evictorLock.Lock() defer nc.evictorLock.Unlock() - return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name) + return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)) } diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 66a0b941c24..29f87b4abcd 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -481,14 +481,16 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { zones := getZones(item.fakeNodeHandler) for _, zone := range zones { nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { - remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) + nodeUid, _ := value.UID.(string) + remaining, _ := deletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore) if remaining { - nodeController.zoneTerminationEvictor[zone].Add(value.Value) + nodeController.zoneTerminationEvictor[zone].Add(value.Value, nodeUid) } return true, 0 }) nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { - terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod) + nodeUid, _ := value.UID.(string) + terminatePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, value.AddedAt, nodeController.maximumGracePeriod) return true, 0 }) } @@ -1014,14 +1016,16 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) { zones := getZones(fakeNodeHandler) for _, zone := range zones { nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { - remaining, _ := deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) + uid, _ := value.UID.(string) + remaining, _ := deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore) if remaining { - nodeController.zoneTerminationEvictor[zone].Add(value.Value) + nodeController.zoneTerminationEvictor[zone].Add(value.Value, value.UID) } return true, 0 }) nodeController.zonePodEvictor[zone].Try(func(value TimedValue) (bool, time.Duration) { - terminatePods(fakeNodeHandler, nodeController.recorder, value.Value, value.AddedAt, nodeController.maximumGracePeriod) + uid, _ := value.UID.(string) + terminatePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, value.AddedAt, nodeController.maximumGracePeriod) return true, 0 }) } @@ -1544,7 +1548,8 @@ func TestNodeDeletion(t *testing.T) { t.Errorf("unexpected error: %v", err) } nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) { - deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeController.daemonSetStore) + uid, _ := value.UID.(string) + deletePods(fakeNodeHandler, nodeController.recorder, value.Value, uid, nodeController.daemonSetStore) return true, 0 }) podEvicted := false @@ -1558,6 +1563,72 @@ func TestNodeDeletion(t *testing.T) { } } +func TestNodeEventGeneration(t *testing.T) { + fakeNow := unversioned.Date(2016, 8, 10, 12, 0, 0, 0, time.UTC) + fakeNodeHandler := &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + UID: "1234567890", + CreationTimestamp: unversioned.Date(2016, 8, 10, 0, 0, 0, 0, time.UTC), + }, + Spec: api.NodeSpec{ + ExternalID: "node0", + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + // Node status has just been updated. + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceRequestsCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("20G"), + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), + } + + nodeController, _ := NewNodeControllerFromClient(nil, fakeNodeHandler, 5*time.Minute, testRateLimiterQPS, + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false) + nodeController.now = func() unversioned.Time { return fakeNow } + fakeRecorder := NewFakeRecorder() + nodeController.recorder = fakeRecorder + if err := nodeController.monitorNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + + fakeNodeHandler.Delete("node0", nil) + if err := nodeController.monitorNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + nodeController.zonePodEvictor[""].Try(func(value TimedValue) (bool, time.Duration) { + nodeUid, _ := value.UID.(string) + deletePods(fakeNodeHandler, nodeController.recorder, value.Value, nodeUid, nodeController.daemonSetStore) + return true, 0 + }) + if len(fakeRecorder.events) != 3 { + t.Fatalf("unexpected events: %v", fakeRecorder.events) + } + if fakeRecorder.events[0].Reason != "RegisteredNode" || fakeRecorder.events[1].Reason != "RemovingNode" || fakeRecorder.events[2].Reason != "DeletingAllPods" { + t.Fatalf("unexpected events generation: %v", fakeRecorder.events) + } + for _, event := range fakeRecorder.events { + involvedObject := event.InvolvedObject + actualUID := string(involvedObject.UID) + if actualUID != "1234567890" { + t.Fatalf("unexpected event uid: %v", actualUID) + } + } +} + func TestCheckPod(t *testing.T) { tcs := []struct { pod api.Pod diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/rate_limited_queue.go index ea6ae8df5e1..96e7f12809a 100644 --- a/pkg/controller/node/rate_limited_queue.go +++ b/pkg/controller/node/rate_limited_queue.go @@ -29,7 +29,9 @@ import ( // TimedValue is a value that should be processed at a designated time. type TimedValue struct { - Value string + Value string + // UID could be anything that helps identify the value + UID interface{} AddedAt time.Time ProcessAt time.Time } @@ -200,12 +202,13 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { } } -// Adds value to the queue to be processed. Won't add the same value a second time if it was already -// added and not removed. -func (q *RateLimitedTimedQueue) Add(value string) bool { +// Adds value to the queue to be processed. Won't add the same value(comparsion by value) a second time +// if it was already added and not removed. +func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool { now := now() return q.queue.Add(TimedValue{ Value: value, + UID: uid, AddedAt: now, ProcessAt: now, }) diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/rate_limited_queue_test.go index 44a24b13f18..b8d871ee70b 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -40,9 +40,9 @@ func CheckSetEq(lhs, rhs sets.String) bool { func TestAddNode(t *testing.T) { evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") queuePattern := []string{"first", "second", "third"} if len(evictor.queue.queue) != len(queuePattern) { @@ -70,9 +70,9 @@ func TestDelNode(t *testing.T) { return t } evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") evictor.Remove("first") queuePattern := []string{"second", "third"} @@ -92,9 +92,9 @@ func TestDelNode(t *testing.T) { } evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") evictor.Remove("second") queuePattern = []string{"first", "third"} @@ -114,9 +114,9 @@ func TestDelNode(t *testing.T) { } evictor = NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") evictor.Remove("third") queuePattern = []string{"first", "second"} @@ -138,9 +138,9 @@ func TestDelNode(t *testing.T) { func TestTry(t *testing.T) { evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") evictor.Remove("second") deletedMap := sets.NewString() @@ -173,9 +173,9 @@ func TestTryOrdering(t *testing.T) { return current } evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") order := []string{} count := 0 @@ -225,9 +225,9 @@ func TestTryOrdering(t *testing.T) { func TestTryRemovingWhileTry(t *testing.T) { evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") processing := make(chan struct{}) wait := make(chan struct{}) @@ -271,9 +271,9 @@ func TestTryRemovingWhileTry(t *testing.T) { func TestClear(t *testing.T) { evictor := NewRateLimitedTimedQueue(flowcontrol.NewFakeAlwaysRateLimiter()) - evictor.Add("first") - evictor.Add("second") - evictor.Add("third") + evictor.Add("first", "11111") + evictor.Add("second", "22222") + evictor.Add("third", "33333") evictor.Clear() diff --git a/pkg/controller/node/test_utils.go b/pkg/controller/node/test_utils.go index 4ccd4e3f827..7cb327f909f 100644 --- a/pkg/controller/node/test_utils.go +++ b/pkg/controller/node/test_utils.go @@ -18,13 +18,18 @@ package node import ( "errors" + "fmt" "sync" + "time" "k8s.io/kubernetes/pkg/api" apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/clock" utilnode "k8s.io/kubernetes/pkg/util/node" "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" @@ -192,6 +197,67 @@ func (m *FakeNodeHandler) Patch(name string, pt api.PatchType, data []byte, subr return nil, nil } +// FakeRecorder is used as a fake during testing. +type FakeRecorder struct { + source api.EventSource + events []*api.Event + clock clock.Clock +} + +func (f *FakeRecorder) Event(obj runtime.Object, eventtype, reason, message string) { + f.generateEvent(obj, unversioned.Now(), eventtype, reason, message) +} + +func (f *FakeRecorder) Eventf(obj runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { + f.Event(obj, eventtype, reason, fmt.Sprintf(messageFmt, args...)) +} + +func (f *FakeRecorder) PastEventf(obj runtime.Object, timestamp unversioned.Time, eventtype, reason, messageFmt string, args ...interface{}) { +} + +func (f *FakeRecorder) generateEvent(obj runtime.Object, timestamp unversioned.Time, eventtype, reason, message string) { + ref, err := api.GetReference(obj) + if err != nil { + return + } + event := f.makeEvent(ref, eventtype, reason, message) + event.Source = f.source + if f.events != nil { + fmt.Println("write event") + f.events = append(f.events, event) + } +} + +func (f *FakeRecorder) makeEvent(ref *api.ObjectReference, eventtype, reason, message string) *api.Event { + fmt.Println("make event") + t := unversioned.Time{Time: f.clock.Now()} + namespace := ref.Namespace + if namespace == "" { + namespace = api.NamespaceDefault + } + return &api.Event{ + ObjectMeta: api.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), + Namespace: namespace, + }, + InvolvedObject: *ref, + Reason: reason, + Message: message, + FirstTimestamp: t, + LastTimestamp: t, + Count: 1, + Type: eventtype, + } +} + +func NewFakeRecorder() *FakeRecorder { + return &FakeRecorder{ + source: api.EventSource{Component: "nodeControllerTest"}, + events: []*api.Event{}, + clock: clock.NewFakeClock(time.Now()), + } +} + func newNode(name string) *api.Node { return &api.Node{ ObjectMeta: api.ObjectMeta{Name: name},