mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Remove kubelet dependency on nodecontroller
This commit is contained in:
parent
86d3072492
commit
236db3c252
@ -221,7 +221,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
||||
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
||||
}}
|
||||
|
||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
||||
nodeController.Run(5*time.Second, true)
|
||||
cadvisorInterface := new(cadvisor.Fake)
|
||||
|
||||
|
@ -69,7 +69,6 @@ type CMServer struct {
|
||||
NodeMilliCPU int64
|
||||
NodeMemory resource.Quantity
|
||||
|
||||
KubeletConfig client.KubeletConfig
|
||||
ClusterName string
|
||||
EnableProfiling bool
|
||||
}
|
||||
@ -87,12 +86,7 @@ func NewCMServer() *CMServer {
|
||||
NodeMilliCPU: 1000,
|
||||
NodeMemory: resource.MustParse("3Gi"),
|
||||
SyncNodeList: true,
|
||||
KubeletConfig: client.KubeletConfig{
|
||||
Port: ports.KubeletPort,
|
||||
EnableHttps: true,
|
||||
HTTPTimeout: time.Duration(5) * time.Second,
|
||||
},
|
||||
ClusterName: "kubernetes",
|
||||
ClusterName: "kubernetes",
|
||||
}
|
||||
return &s
|
||||
}
|
||||
@ -133,7 +127,6 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||
// TODO: in the meantime, use resource.QuantityFlag() instead of these
|
||||
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
|
||||
fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node_memory", "The amount of memory (in bytes) provisioned on each node")
|
||||
client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig)
|
||||
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
|
||||
fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
}
|
||||
@ -182,11 +175,6 @@ func (s *CMServer) Run(_ []string) error {
|
||||
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient)
|
||||
controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod)
|
||||
|
||||
kubeletClient, err := client.NewKubeletClient(&s.KubeletConfig)
|
||||
if err != nil {
|
||||
glog.Fatalf("Failure to start kubelet client: %v", err)
|
||||
}
|
||||
|
||||
cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
nodeResources := &api.NodeResources{
|
||||
Capacity: api.ResourceList{
|
||||
@ -200,7 +188,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||
}
|
||||
|
||||
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
|
||||
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||
kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName)
|
||||
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
|
||||
|
||||
|
@ -39,7 +39,6 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/service"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
@ -128,10 +127,9 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
||||
api.ResourceMemory: *resource.NewQuantity(nodeMemory, resource.BinarySI),
|
||||
},
|
||||
}
|
||||
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
|
||||
|
||||
nodeController := nodeControllerPkg.NewNodeController(
|
||||
nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
||||
nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
||||
nodeController.Run(10*time.Second, true)
|
||||
|
||||
endpoints := service.NewEndpointController(cl)
|
||||
|
@ -58,7 +58,6 @@ type NodeController struct {
|
||||
staticResources *api.NodeResources
|
||||
nodes []string
|
||||
kubeClient client.Interface
|
||||
kubeletClient client.KubeletClient
|
||||
recorder record.EventRecorder
|
||||
registerRetryCount int
|
||||
podEvictionTimeout time.Duration
|
||||
@ -104,7 +103,6 @@ func NewNodeController(
|
||||
nodes []string,
|
||||
staticResources *api.NodeResources,
|
||||
kubeClient client.Interface,
|
||||
kubeletClient client.KubeletClient,
|
||||
registerRetryCount int,
|
||||
podEvictionTimeout time.Duration,
|
||||
deletingPodsRateLimiter util.RateLimiter,
|
||||
@ -126,7 +124,6 @@ func NewNodeController(
|
||||
nodes: nodes,
|
||||
staticResources: staticResources,
|
||||
kubeClient: kubeClient,
|
||||
kubeletClient: kubeletClient,
|
||||
recorder: recorder,
|
||||
registerRetryCount: registerRetryCount,
|
||||
podEvictionTimeout: podEvictionTimeout,
|
||||
|
@ -19,7 +19,6 @@ package controller
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
@ -33,7 +32,6 @@ import (
|
||||
fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
@ -136,24 +134,6 @@ func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, re
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// FakeKubeletClient is a fake implementation of KubeletClient.
|
||||
type FakeKubeletClient struct {
|
||||
Status probe.Result
|
||||
Err error
|
||||
}
|
||||
|
||||
func (c *FakeKubeletClient) GetPodStatus(host, podNamespace, podID string) (api.PodStatusResult, error) {
|
||||
return api.PodStatusResult{}, errors.New("Not Implemented")
|
||||
}
|
||||
|
||||
func (c *FakeKubeletClient) GetConnectionInfo(host string) (string, uint, http.RoundTripper, error) {
|
||||
return "", 0, nil, errors.New("Not Implemented")
|
||||
}
|
||||
|
||||
func (c *FakeKubeletClient) HealthCheck(host string) (probe.Result, error) {
|
||||
return c.Status, c.Err
|
||||
}
|
||||
|
||||
func TestRegisterNodes(t *testing.T) {
|
||||
table := []struct {
|
||||
fakeNodeHandler *FakeNodeHandler
|
||||
@ -257,7 +237,7 @@ func TestRegisterNodes(t *testing.T) {
|
||||
for _, machine := range item.machines {
|
||||
nodes.Items = append(nodes.Items, *newNode(machine))
|
||||
}
|
||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
||||
if !item.expectedFail && err != nil {
|
||||
@ -343,7 +323,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute,
|
||||
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodes, err := nodeController.GetStaticNodesWithSpec()
|
||||
if err != nil {
|
||||
@ -405,7 +385,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute,
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodes, err := nodeController.GetCloudNodesWithSpec()
|
||||
if err != nil {
|
||||
@ -515,7 +495,7 @@ func TestSyncCloudNodes(t *testing.T) {
|
||||
if item.fakeNodeHandler.Fake == nil {
|
||||
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
||||
}
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@ -599,7 +579,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||
if item.fakeNodeHandler.Fake == nil {
|
||||
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
||||
}
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@ -668,7 +648,7 @@ func TestSyncCloudNodesReconcilesExternalService(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil,
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler,
|
||||
10, time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "kubernetes")
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
@ -704,7 +684,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute,
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
result, err := nodeController.PopulateAddresses(item.nodes)
|
||||
// In case of IP querying error, we should continue.
|
||||
@ -903,7 +883,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10,
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10,
|
||||
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
|
||||
testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
@ -1106,7 +1086,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user