diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 89d042a51fe..3612586ac96 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -203,8 +203,8 @@ func startComponents(manifestURL string) (apiServerURL string) { nodeResources := &api.NodeResources{} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 5*time.Minute) - nodeController.Run(5*time.Second, 10, true) + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute) + nodeController.Run(5*time.Second, true) // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 4816313059a..39a950be88a 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -124,8 +124,8 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, } kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 5*time.Minute) - nodeController.Run(10*time.Second, 10, true) + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute) + nodeController.Run(10*time.Second, true) endpoints := service.NewEndpointController(cl) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index d9ac1fc0734..2b8dec28c30 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -47,6 +47,7 @@ type NodeController struct { nodes []string kubeClient client.Interface kubeletClient client.KubeletHealthChecker + registerRetryCount int podEvictionTimeout time.Duration } @@ -60,6 +61,7 @@ func NewNodeController( staticResources *api.NodeResources, kubeClient client.Interface, kubeletClient client.KubeletHealthChecker, + registerRetryCount int, podEvictionTimeout time.Duration) *NodeController { return &NodeController{ cloud: cloud, @@ -68,13 +70,14 @@ func NewNodeController( staticResources: staticResources, kubeClient: kubeClient, kubeletClient: kubeletClient, + registerRetryCount: registerRetryCount, podEvictionTimeout: podEvictionTimeout, } } // Run creates initial node list and start syncing instances from cloudprovider if any. // It also starts syncing cluster node status. -func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList bool) { +func (s *NodeController) Run(period time.Duration, syncNodeList bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList var err error @@ -98,7 +101,7 @@ func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList if err != nil { glog.Errorf("Error getting nodes ips: %v", err) } - if err = s.RegisterNodes(nodes, retryCount, period); err != nil { + if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil { glog.Errorf("Error registrying node list %+v: %v", nodes, err) } @@ -327,7 +330,7 @@ func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { func (s *NodeController) deletePods(nodeID string) error { glog.V(2).Infof("Delete all pods from %v", nodeID) // TODO: We don't yet have field selectors from client, see issue #1362. - pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Set{}.AsSelector()) + pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) if err != nil { return err } diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 479c02bb4fe..9c7abfbfec9 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -230,7 +230,7 @@ func TestRegisterNodes(t *testing.T) { for _, machine := range item.machines { nodes.Items = append(nodes.Items, *newNode(machine)) } - nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute) + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond) if !item.expectedFail && err != nil { t.Errorf("unexpected error: %v", err) @@ -268,10 +268,27 @@ func TestCreateStaticNodes(t *testing.T) { }, }, }, + { + machines: []string{"node0", "node1"}, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{}, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{}, + }, + }, + }, + }, } for _, item := range table { - nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil, time.Minute) + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil, 10, time.Minute) nodes, err := nodeController.StaticNodes() if err != nil { t.Errorf("unexpected error: %v", err) @@ -311,10 +328,28 @@ func TestCreateCloudNodes(t *testing.T) { }, }, }, + { + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + NodeResources: &api.NodeResources{Capacity: resourceList}, + }, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{Capacity: resourceList}, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Spec: api.NodeSpec{Capacity: resourceList}, + }, + }, + }, + }, } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute) nodes, err := nodeController.CloudNodes() if err != nil { t.Errorf("unexpected error: %v", err) @@ -335,6 +370,20 @@ func TestSyncCloud(t *testing.T) { expectedDeleted []string }{ { + // 1 existing node, 1 cloud nodes: do nothing. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedRequestCount: 1, // List + expectedCreated: []string{}, + expectedDeleted: []string{}, + }, + { + // 1 existing node, 2 cloud nodes: create 1. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0")}, }, @@ -347,6 +396,7 @@ func TestSyncCloud(t *testing.T) { expectedDeleted: []string{}, }, { + // 2 existing nodes, 1 cloud node: delete 1. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0"), newNode("node1")}, }, @@ -359,6 +409,7 @@ func TestSyncCloud(t *testing.T) { expectedDeleted: []string{"node1"}, }, { + // 1 existing node, 3 cloud nodes but only 2 match regex: delete 1. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0")}, }, @@ -373,7 +424,7 @@ func TestSyncCloud(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute) + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) if err := nodeController.SyncCloud(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -451,7 +502,7 @@ func TestSyncCloudDeletePods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute) + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) if err := nodeController.SyncCloud(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -503,7 +554,7 @@ func TestHealthCheckNode(t *testing.T) { }, }, { - node: newNode("node1"), + node: newNode("node0"), fakeKubeletClient: &FakeKubeletClient{ Status: probe.Failure, Err: errors.New("Error"), @@ -519,7 +570,7 @@ func TestHealthCheckNode(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, time.Minute) + nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute) conditions := nodeController.DoCheck(item.node) for i := range conditions { if conditions[i].LastTransitionTime.IsZero() { @@ -557,7 +608,7 @@ func TestPopulateNodeIPs(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute) result, err := nodeController.PopulateIPs(item.nodes) // In case of IP querying error, we should continue. if err != nil { @@ -633,7 +684,7 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) if err := nodeController.SyncNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -792,7 +843,7 @@ func TestSyncNodeStatusDeletePods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 5*time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute) if err := nodeController.SyncNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -857,7 +908,7 @@ func TestSyncNodeStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, time.Minute) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) if err := nodeController.SyncNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/controllermanager/controllermanager.go b/pkg/controllermanager/controllermanager.go index 6432add050d..cc1246e5d95 100644 --- a/pkg/controllermanager/controllermanager.go +++ b/pkg/controllermanager/controllermanager.go @@ -54,11 +54,8 @@ type CMServer struct { ResourceQuotaSyncPeriod time.Duration RegisterRetryCount int MachineList util.StringList -<<<<<<< HEAD SyncNodeList bool -======= PodEvictionTimeout time.Duration ->>>>>>> Remove pods from failed node // TODO: Discover these by pinging the host machines, and rip out these params. NodeMilliCPU int64 @@ -176,8 +173,9 @@ func (s *CMServer) Run(_ []string) error { }, } - nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient, s.PodEvictionTimeout) - nodeController.Run(s.NodeSyncPeriod, s.RegisterRetryCount, s.SyncNodeList) + nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, + kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout) + nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)