mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Refactor RegisterRetryCount, improve tests
This commit is contained in:
parent
55b9944cfe
commit
4c6f6e0efc
@ -203,8 +203,8 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
|||||||
|
|
||||||
nodeResources := &api.NodeResources{}
|
nodeResources := &api.NodeResources{}
|
||||||
|
|
||||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 5*time.Minute)
|
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
|
||||||
nodeController.Run(5*time.Second, 10, true)
|
nodeController.Run(5*time.Second, true)
|
||||||
|
|
||||||
// Kubelet (localhost)
|
// Kubelet (localhost)
|
||||||
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
|
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
|
||||||
|
@ -124,8 +124,8 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
|||||||
}
|
}
|
||||||
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
|
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
|
||||||
|
|
||||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 5*time.Minute)
|
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
|
||||||
nodeController.Run(10*time.Second, 10, true)
|
nodeController.Run(10*time.Second, true)
|
||||||
|
|
||||||
endpoints := service.NewEndpointController(cl)
|
endpoints := service.NewEndpointController(cl)
|
||||||
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
|
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
|
||||||
|
@ -47,6 +47,7 @@ type NodeController struct {
|
|||||||
nodes []string
|
nodes []string
|
||||||
kubeClient client.Interface
|
kubeClient client.Interface
|
||||||
kubeletClient client.KubeletHealthChecker
|
kubeletClient client.KubeletHealthChecker
|
||||||
|
registerRetryCount int
|
||||||
podEvictionTimeout time.Duration
|
podEvictionTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,6 +61,7 @@ func NewNodeController(
|
|||||||
staticResources *api.NodeResources,
|
staticResources *api.NodeResources,
|
||||||
kubeClient client.Interface,
|
kubeClient client.Interface,
|
||||||
kubeletClient client.KubeletHealthChecker,
|
kubeletClient client.KubeletHealthChecker,
|
||||||
|
registerRetryCount int,
|
||||||
podEvictionTimeout time.Duration) *NodeController {
|
podEvictionTimeout time.Duration) *NodeController {
|
||||||
return &NodeController{
|
return &NodeController{
|
||||||
cloud: cloud,
|
cloud: cloud,
|
||||||
@ -68,13 +70,14 @@ func NewNodeController(
|
|||||||
staticResources: staticResources,
|
staticResources: staticResources,
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
kubeletClient: kubeletClient,
|
kubeletClient: kubeletClient,
|
||||||
|
registerRetryCount: registerRetryCount,
|
||||||
podEvictionTimeout: podEvictionTimeout,
|
podEvictionTimeout: podEvictionTimeout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run creates initial node list and start syncing instances from cloudprovider if any.
|
// Run creates initial node list and start syncing instances from cloudprovider if any.
|
||||||
// It also starts syncing cluster node status.
|
// It also starts syncing cluster node status.
|
||||||
func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList bool) {
|
func (s *NodeController) Run(period time.Duration, syncNodeList bool) {
|
||||||
// Register intial set of nodes with their status set.
|
// Register intial set of nodes with their status set.
|
||||||
var nodes *api.NodeList
|
var nodes *api.NodeList
|
||||||
var err error
|
var err error
|
||||||
@ -98,7 +101,7 @@ func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error getting nodes ips: %v", err)
|
glog.Errorf("Error getting nodes ips: %v", err)
|
||||||
}
|
}
|
||||||
if err = s.RegisterNodes(nodes, retryCount, period); err != nil {
|
if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil {
|
||||||
glog.Errorf("Error registrying node list %+v: %v", nodes, err)
|
glog.Errorf("Error registrying node list %+v: %v", nodes, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -327,7 +330,7 @@ func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
|
|||||||
func (s *NodeController) deletePods(nodeID string) error {
|
func (s *NodeController) deletePods(nodeID string) error {
|
||||||
glog.V(2).Infof("Delete all pods from %v", nodeID)
|
glog.V(2).Infof("Delete all pods from %v", nodeID)
|
||||||
// TODO: We don't yet have field selectors from client, see issue #1362.
|
// TODO: We don't yet have field selectors from client, see issue #1362.
|
||||||
pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Set{}.AsSelector())
|
pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,7 @@ func TestRegisterNodes(t *testing.T) {
|
|||||||
for _, machine := range item.machines {
|
for _, machine := range item.machines {
|
||||||
nodes.Items = append(nodes.Items, *newNode(machine))
|
nodes.Items = append(nodes.Items, *newNode(machine))
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute)
|
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||||
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
||||||
if !item.expectedFail && err != nil {
|
if !item.expectedFail && err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -268,10 +268,27 @@ func TestCreateStaticNodes(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
machines: []string{"node0", "node1"},
|
||||||
|
expectedNodes: &api.NodeList{
|
||||||
|
Items: []api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node0"},
|
||||||
|
Spec: api.NodeSpec{},
|
||||||
|
Status: api.NodeStatus{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node1"},
|
||||||
|
Spec: api.NodeSpec{},
|
||||||
|
Status: api.NodeStatus{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil, time.Minute)
|
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil, 10, time.Minute)
|
||||||
nodes, err := nodeController.StaticNodes()
|
nodes, err := nodeController.StaticNodes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -311,10 +328,28 @@ func TestCreateCloudNodes(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0", "node1"},
|
||||||
|
NodeResources: &api.NodeResources{Capacity: resourceList},
|
||||||
|
},
|
||||||
|
expectedNodes: &api.NodeList{
|
||||||
|
Items: []api.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node0"},
|
||||||
|
Spec: api.NodeSpec{Capacity: resourceList},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{Name: "node1"},
|
||||||
|
Spec: api.NodeSpec{Capacity: resourceList},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, time.Minute)
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute)
|
||||||
nodes, err := nodeController.CloudNodes()
|
nodes, err := nodeController.CloudNodes()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -335,6 +370,20 @@ func TestSyncCloud(t *testing.T) {
|
|||||||
expectedDeleted []string
|
expectedDeleted []string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
|
// 1 existing node, 1 cloud nodes: do nothing.
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{newNode("node0")},
|
||||||
|
},
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0"},
|
||||||
|
},
|
||||||
|
matchRE: ".*",
|
||||||
|
expectedRequestCount: 1, // List
|
||||||
|
expectedCreated: []string{},
|
||||||
|
expectedDeleted: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// 1 existing node, 2 cloud nodes: create 1.
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Existing: []*api.Node{newNode("node0")},
|
Existing: []*api.Node{newNode("node0")},
|
||||||
},
|
},
|
||||||
@ -347,6 +396,7 @@ func TestSyncCloud(t *testing.T) {
|
|||||||
expectedDeleted: []string{},
|
expectedDeleted: []string{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
// 2 existing nodes, 1 cloud node: delete 1.
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||||
},
|
},
|
||||||
@ -359,6 +409,7 @@ func TestSyncCloud(t *testing.T) {
|
|||||||
expectedDeleted: []string{"node1"},
|
expectedDeleted: []string{"node1"},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
// 1 existing node, 3 cloud nodes but only 2 match regex: delete 1.
|
||||||
fakeNodeHandler: &FakeNodeHandler{
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
Existing: []*api.Node{newNode("node0")},
|
Existing: []*api.Node{newNode("node0")},
|
||||||
},
|
},
|
||||||
@ -373,7 +424,7 @@ func TestSyncCloud(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute)
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||||
if err := nodeController.SyncCloud(); err != nil {
|
if err := nodeController.SyncCloud(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -451,7 +502,7 @@ func TestSyncCloudDeletePods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, time.Minute)
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||||
if err := nodeController.SyncCloud(); err != nil {
|
if err := nodeController.SyncCloud(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -503,7 +554,7 @@ func TestHealthCheckNode(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
node: newNode("node1"),
|
node: newNode("node0"),
|
||||||
fakeKubeletClient: &FakeKubeletClient{
|
fakeKubeletClient: &FakeKubeletClient{
|
||||||
Status: probe.Failure,
|
Status: probe.Failure,
|
||||||
Err: errors.New("Error"),
|
Err: errors.New("Error"),
|
||||||
@ -519,7 +570,7 @@ func TestHealthCheckNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, time.Minute)
|
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute)
|
||||||
conditions := nodeController.DoCheck(item.node)
|
conditions := nodeController.DoCheck(item.node)
|
||||||
for i := range conditions {
|
for i := range conditions {
|
||||||
if conditions[i].LastTransitionTime.IsZero() {
|
if conditions[i].LastTransitionTime.IsZero() {
|
||||||
@ -557,7 +608,7 @@ func TestPopulateNodeIPs(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, time.Minute)
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute)
|
||||||
result, err := nodeController.PopulateIPs(item.nodes)
|
result, err := nodeController.PopulateIPs(item.nodes)
|
||||||
// In case of IP querying error, we should continue.
|
// In case of IP querying error, we should continue.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -633,7 +684,7 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, time.Minute)
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
||||||
if err := nodeController.SyncNodeStatus(); err != nil {
|
if err := nodeController.SyncNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -792,7 +843,7 @@ func TestSyncNodeStatusDeletePods(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 5*time.Minute)
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute)
|
||||||
if err := nodeController.SyncNodeStatus(); err != nil {
|
if err := nodeController.SyncNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -857,7 +908,7 @@ func TestSyncNodeStatus(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, time.Minute)
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
||||||
if err := nodeController.SyncNodeStatus(); err != nil {
|
if err := nodeController.SyncNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -54,11 +54,8 @@ type CMServer struct {
|
|||||||
ResourceQuotaSyncPeriod time.Duration
|
ResourceQuotaSyncPeriod time.Duration
|
||||||
RegisterRetryCount int
|
RegisterRetryCount int
|
||||||
MachineList util.StringList
|
MachineList util.StringList
|
||||||
<<<<<<< HEAD
|
|
||||||
SyncNodeList bool
|
SyncNodeList bool
|
||||||
=======
|
|
||||||
PodEvictionTimeout time.Duration
|
PodEvictionTimeout time.Duration
|
||||||
>>>>>>> Remove pods from failed node
|
|
||||||
|
|
||||||
// TODO: Discover these by pinging the host machines, and rip out these params.
|
// TODO: Discover these by pinging the host machines, and rip out these params.
|
||||||
NodeMilliCPU int64
|
NodeMilliCPU int64
|
||||||
@ -176,8 +173,9 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient, s.PodEvictionTimeout)
|
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
|
||||||
nodeController.Run(s.NodeSyncPeriod, s.RegisterRetryCount, s.SyncNodeList)
|
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
|
||||||
|
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
|
||||||
|
|
||||||
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
|
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
|
||||||
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
|
resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)
|
||||||
|
Loading…
Reference in New Issue
Block a user