Merge pull request #39011 from wojtek-t/node_controller_listing_from_cache

Automatic merge from submit-queue

NodeController listing nodes from cache instead of cache in apiserver

This is reducing load on apiserver.
This commit is contained in:
Kubernetes Submit Queue 2016-12-21 03:13:09 -08:00 committed by GitHub
commit f42574893b
2 changed files with 59 additions and 7 deletions

View File

@ -408,14 +408,13 @@ func (nc *NodeController) Run() {
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error {
// It is enough to list Nodes from apiserver, since we can tolerate some small
// delays comparing to state from etcd and there is eventual consistency anyway.
// TODO: We should list them from local cache: nodeStore.
nodes, err := nc.kubeClient.Core().Nodes().List(v1.ListOptions{ResourceVersion: "0"})
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeStore.List()
if err != nil {
return err
}
added, deleted := nc.checkForNodeAddedDeleted(nodes)
added, deleted := nc.checkForNodeAddedDeleted(&nodes)
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
@ -444,7 +443,12 @@ func (nc *NodeController) monitorNodeStatus() error {
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
var currentReadyCondition *v1.NodeCondition
node := &nodes.Items[i]
nodeCopy, err := api.Scheme.DeepCopy(&nodes.Items[i])
if err != nil {
utilruntime.HandleError(err)
continue
}
node := nodeCopy.(*v1.Node)
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil {
@ -522,7 +526,7 @@ func (nc *NodeController) monitorNodeStatus() error {
}
}
}
nc.handleDisruption(zoneToNodeConditions, nodes)
nc.handleDisruption(zoneToNodeConditions, &nodes)
return nil
}

View File

@ -79,6 +79,18 @@ func NewNodeControllerFromClient(
return nc, nil
}
func syncNodeStore(nc *NodeController, fakeNodeHandler *testutil.FakeNodeHandler) error {
nodes, err := fakeNodeHandler.List(v1.ListOptions{})
if err != nil {
return err
}
newElems := make([]interface{}, 0, len(nodes.Items))
for i := range nodes.Items {
newElems = append(newElems, &nodes.Items[i])
}
return nc.nodeStore.Replace(newElems, "newRV")
}
func TestMonitorNodeStatusEvictPods(t *testing.T) {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
evictionTimeout := 10 * time.Minute
@ -508,6 +520,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, ds := range item.daemonSets {
nodeController.daemonSetStore.Add(&ds)
}
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -516,6 +531,9 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
item.fakeNodeHandler.Existing[1].Status = item.secondNodeNewStatus
}
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -643,6 +661,9 @@ func TestPodStatusChange(t *testing.T) {
evictionTimeout, testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold, testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -651,6 +672,9 @@ func TestPodStatusChange(t *testing.T) {
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
item.fakeNodeHandler.Existing[1].Status = item.secondNodeNewStatus
}
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1159,6 +1183,9 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
nodeController.enterFullDisruptionFunc = func(nodeNum int) float32 {
return testRateLimiterQPS
}
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("%v: unexpected error: %v", item.description, err)
}
@ -1174,6 +1201,9 @@ func TestMonitorNodeStatusEvictPodsWithDisruption(t *testing.T) {
fakeNodeHandler.Existing[i].Status = item.updatedNodeStatuses[i]
}
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("%v: unexpected error: %v", item.description, err)
}
@ -1244,6 +1274,9 @@ func TestCloudProviderNoRateLimit(t *testing.T) {
return false, nil
}
// monitorNodeStatus should allow this node to be immediately deleted
if err := syncNodeStore(nodeController, fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1472,12 +1505,18 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.timeToPass > 0 {
nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} }
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -1699,12 +1738,18 @@ func TestMonitorNodeStatusMarkPodsNotReady(t *testing.T) {
testRateLimiterQPS, testRateLimiterQPS, testLargeClusterThreshold, testUnhealtyThreshold,
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, nil, 0, false)
nodeController.now = func() metav1.Time { return fakeNow }
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
if item.timeToPass > 0 {
nodeController.now = func() metav1.Time { return metav1.Time{Time: fakeNow.Add(item.timeToPass)} }
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
if err := syncNodeStore(nodeController, item.fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("Case[%d] unexpected error: %v", i, err)
}
@ -1761,6 +1806,9 @@ func TestNodeEventGeneration(t *testing.T) {
nodeController.now = func() metav1.Time { return fakeNow }
fakeRecorder := testutil.NewFakeRecorder()
nodeController.recorder = fakeRecorder
if err := syncNodeStore(nodeController, fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}