From c53786ab31dc742708c449d6de9c4f170349f2a1 Mon Sep 17 00:00:00 2001 From: Quinton Hoole Date: Wed, 20 May 2015 13:47:51 -0700 Subject: [PATCH] Revert "Modify nodes to register directly with the master." --- cluster/saltbase/salt/kubelet/default | 15 +- cmd/integration/integration.go | 7 +- .../app/controllermanager.go | 2 +- cmd/kubelet/app/server.go | 7 - cmd/kubernetes/kubernetes.go | 10 +- pkg/api/validation/validation.go | 2 - pkg/cloudprovider/cloud.go | 3 - pkg/cloudprovider/gce/gce.go | 5 +- .../nodecontroller/nodecontroller.go | 385 ++++++++++--- .../nodecontroller/nodecontroller_test.go | 506 +++++++++++++++++- pkg/kubelet/kubelet.go | 173 ++---- pkg/kubelet/kubelet_test.go | 73 +-- 12 files changed, 875 insertions(+), 313 deletions(-) diff --git a/cluster/saltbase/salt/kubelet/default b/cluster/saltbase/salt/kubelet/default index ccb23b9a0fd..c39b050ecd8 100644 --- a/cluster/saltbase/salt/kubelet/default +++ b/cluster/saltbase/salt/kubelet/default @@ -22,19 +22,6 @@ {% set api_servers_with_port = api_servers + ":6443" -%} {% endif -%} -# Disable registration for the kubelet running on the master on GCE. -# TODO(roberthbailey): Make this configurable via an env var in config-default.sh -{% if grains.cloud == 'gce' -%} - {% if grains['roles'][0] == 'kubernetes-master' -%} - {% set api_servers_with_port = "" -%} - {% endif -%} -{% endif -%} - -{% set cloud_provider = "" -%} -{% if grains.cloud is defined -%} - {% set cloud_provider = "--cloud_provider=" + grains.cloud -%} -{% endif -%} - {% set config = "--config=/etc/kubernetes/manifests" -%} {% set hostname_override = "" -%} {% if grains.hostname_override is defined -%} @@ -58,4 +45,4 @@ {% set configure_cbr0 = "--configure-cbr0=" + pillar['allocate_node_cidrs'] -%} {% endif -%} -DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{hostname_override}} {{cloud_provider}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{configure_cbr0}}" +DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{hostname_override}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{configure_cbr0}}" diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index cc6fda5f062..7d8c79f9f64 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -101,6 +101,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st // Setup servers := []string{} glog.Infof("Creating etcd client pointing to %v", servers) + machineList := []string{"localhost", "127.0.0.1"} handler := delegateHandler{} apiServer := httptest.NewServer(&handler) @@ -195,7 +196,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), }} - nodeController := nodecontroller.NewNodeController(nil, "", nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5*time.Second, true) cadvisorInterface := new(cadvisor.Fake) @@ -205,7 +206,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st configFilePath := makeTempDirOrDie("config", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir) fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.15"} - kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, "localhost", testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{}) + kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{}) kubeletapp.RunKubelet(kcfg, nil) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both @@ -213,7 +214,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st testRootDir = makeTempDirOrDie("kubelet_integ_2.", "") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) fakeDocker2.VersionInfo = docker.Env{"ApiVersion=1.15"} - kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, "127.0.0.1", testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{}) + kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{}) kubeletapp.RunKubelet(kcfg, nil) return apiServer.URL, configFilePath } diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 11278bd084f..c9a7276f523 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -228,7 +228,7 @@ func (s *CMServer) Run(_ []string) error { glog.Warning("DEPRECATION NOTICE: sync-node-status flag is being deprecated. It has no effect now and it will be removed in a future version.") } - nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, nodeResources, + nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 817d1b7fc75..668b1b45aa0 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -89,7 +89,6 @@ type KubeletServer struct { HealthzBindAddress util.IP OOMScoreAdj int APIServerList util.StringList - RegisterNode bool ClusterDomain string MasterServiceNamespace string ClusterDNS util.IP @@ -156,7 +155,6 @@ func NewKubeletServer() *KubeletServer { CadvisorPort: 4194, HealthzPort: 10248, HealthzBindAddress: util.IP(net.ParseIP("127.0.0.1")), - RegisterNode: true, // will be ignored if no apiserver is configured OOMScoreAdj: -900, MasterServiceNamespace: api.NamespaceDefault, ImageGCHighThresholdPercent: 90, @@ -213,7 +211,6 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.Var(&s.HealthzBindAddress, "healthz-bind-address", "The IP address for the healthz server to serve on, defaulting to 127.0.0.1 (set to 0.0.0.0 for all interfaces)") fs.IntVar(&s.OOMScoreAdj, "oom-score-adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]") fs.Var(&s.APIServerList, "api-servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.") - fs.BoolVar(&s.RegisterNode, "register-node", s.RegisterNode, "Register the node with the apiserver (defaults to true if --api-server is set)") fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.Var(&s.ClusterDNS, "cluster-dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") @@ -321,7 +318,6 @@ func (s *KubeletServer) Run(_ []string) error { MinimumGCAge: s.MinimumGCAge, MaxPerPodContainerCount: s.MaxPerPodContainerCount, MaxContainerCount: s.MaxContainerCount, - RegisterNode: s.RegisterNode, ClusterDomain: s.ClusterDomain, ClusterDNS: s.ClusterDNS, Runonce: s.RunOnce, @@ -497,7 +493,6 @@ func SimpleKubelet(client *client.Client, MinimumGCAge: 10 * time.Second, MaxPerPodContainerCount: 5, MaxContainerCount: 100, - RegisterNode: true, MasterServiceNamespace: masterServiceNamespace, VolumePlugins: volumePlugins, TLSOptions: tlsOptions, @@ -623,7 +618,6 @@ type KubeletConfig struct { MinimumGCAge time.Duration MaxPerPodContainerCount int MaxContainerCount int - RegisterNode bool ClusterDomain string ClusterDNS util.IP EnableServer bool @@ -681,7 +675,6 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.RegistryBurst, gcPolicy, pc.SeenAllSources, - kc.RegisterNode, kc.ClusterDomain, net.IP(kc.ClusterDNS), kc.MasterServiceNamespace, diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 1f0e7d71b7b..aa8b746b4ec 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -123,7 +123,7 @@ func runScheduler(cl *client.Client) { } // RunControllerManager starts a controller -func runControllerManager(cl *client.Client, nodeMilliCPU, nodeMemory int64) { +func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, nodeMemory int64) { nodeResources := &api.NodeResources{ Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(nodeMilliCPU, resource.DecimalSI), @@ -133,7 +133,7 @@ func runControllerManager(cl *client.Client, nodeMilliCPU, nodeMemory int64) { const nodeSyncPeriod = 10 * time.Second nodeController := nodecontroller.NewNodeController( - nil, "", nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), + nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(nodeSyncPeriod, true) @@ -150,16 +150,18 @@ func runControllerManager(cl *client.Client, nodeMilliCPU, nodeMemory int64) { } func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP, port int) { + machineList := []string{"localhost"} + runApiServer(etcdClient, addr, port, *masterServiceNamespace) runScheduler(cl) - runControllerManager(cl, *nodeMilliCPU, *nodeMemory) + runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint) cadvisorInterface, err := cadvisor.New(0) if err != nil { glog.Fatalf("Failed to create cAdvisor: %v", err) } - kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, "localhost", "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.RealOS{}) + kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.RealOS{}) kubeletapp.RunKubelet(kcfg, nil) } diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index cccd4cf165b..f4fae7f13e3 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -1222,8 +1222,6 @@ func ValidateNodeUpdate(oldNode *api.Node, node *api.Node) errs.ValidationErrorL oldNode.ObjectMeta = node.ObjectMeta // Allow users to update capacity oldNode.Status.Capacity = node.Status.Capacity - // Allow the controller manager to assign a CIDR to a node. - oldNode.Spec.PodCIDR = node.Spec.PodCIDR // Allow users to unschedule node oldNode.Spec.Unschedulable = node.Spec.Unschedulable // Clear status diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index f62b31ff6c1..59418aeb40d 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -17,7 +17,6 @@ limitations under the License. package cloudprovider import ( - "errors" "net" "strings" @@ -87,8 +86,6 @@ type Instances interface { Release(name string) error } -var InstanceNotFound = errors.New("instance not found") - // Zone represents the location of a particular machine. type Zone struct { FailureDomain string diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 08f7a090ab9..f63569d7462 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -444,10 +444,7 @@ func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) { name = canonicalizeInstanceName(name) res, err := gce.service.Instances.Get(gce.projectID, gce.zone, name).Do() if err != nil { - glog.Errorf("Failed to retrieve TargetInstance resource for instance: %s", name) - if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == http.StatusNotFound { - return nil, cloudprovider.InstanceNotFound - } + glog.Errorf("Failed to retrieve TargetInstance resource for instance:%s", name) return nil, err } return res, nil diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index e9aa54e0f89..55510ebfb26 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -20,9 +20,14 @@ import ( "errors" "fmt" "net" + "strings" + "sync" + "sync/atomic" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" @@ -52,6 +57,7 @@ type NodeController struct { cloud cloudprovider.Interface matchRE string staticResources *api.NodeResources + nodes []string kubeClient client.Interface recorder record.EventRecorder registerRetryCount int @@ -94,6 +100,7 @@ type NodeController struct { func NewNodeController( cloud cloudprovider.Interface, matchRE string, + nodes []string, staticResources *api.NodeResources, kubeClient client.Interface, registerRetryCount int, @@ -118,6 +125,7 @@ func NewNodeController( return &NodeController{ cloud: cloud, matchRE: matchRE, + nodes: nodes, staticResources: staticResources, kubeClient: kubeClient, recorder: recorder, @@ -136,9 +144,9 @@ func NewNodeController( } // Generates num pod CIDRs that could be assigned to nodes. -func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet { +func (nc *NodeController) generateCIDRs(num int) util.StringSet { res := util.NewStringSet() - cidrIP := clusterCIDR.IP.To4() + cidrIP := nc.clusterCIDR.IP.To4() for i := 0; i < num; i++ { // TODO: Make the CIDRs configurable. b1 := byte(i >> 8) @@ -148,46 +156,37 @@ func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet { return res } -// reconcilePodCIDRs looks at each node and assigns it a valid CIDR -// if it doesn't currently have one. -func (nc *NodeController) reconcilePodCIDRs(nodes *api.NodeList) { - glog.V(4).Infof("Reconciling pods cidrs for %d nodes", len(nodes.Items)) - // TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs - // on each sync period? - availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items)) - for _, node := range nodes.Items { - if node.Spec.PodCIDR != "" { - glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name) - availableCIDRs.Delete(node.Spec.PodCIDR) - } +// For each node from newNodes, finds its current spec in registeredNodes. +// If it is not there, it gets a new valid CIDR assigned. +func (nc *NodeController) reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList { + registeredCIDRs := make(map[string]string) + availableCIDRs := nc.generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items)) + for _, node := range registeredNodes.Items { + registeredCIDRs[node.Name] = node.Spec.PodCIDR + availableCIDRs.Delete(node.Spec.PodCIDR) } - for _, node := range nodes.Items { - if node.Spec.PodCIDR == "" { - podCIDR, found := availableCIDRs.PopAny() - if !found { - glog.Errorf("No available CIDR for node %s", node.Name) - continue - } - glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR) - node.Spec.PodCIDR = podCIDR - if err := nc.configureNodeCIDR(&node); err != nil { - glog.Errorf("Error configuring node %s: %s", node.Name, err) - // The newly assigned CIDR was not properly configured, so don't save it in the API server. - continue - } - if _, err := nc.kubeClient.Nodes().Update(&node); err != nil { - glog.Errorf("Unable to assign node %s CIDR %s: %v", node.Name, podCIDR, err) - } + for i, node := range newNodes.Items { + podCIDR, registered := registeredCIDRs[node.Name] + if !registered { + podCIDR, _ = availableCIDRs.PopAny() } + newNodes.Items[i].Spec.PodCIDR = podCIDR } + return newNodes } -func (nc *NodeController) configureNodeCIDR(node *api.Node) error { +func (nc *NodeController) configureNodeCIDR(node *api.Node) { instances, ok := nc.cloud.Instances() if !ok { - return fmt.Errorf("error configuring node %s: CloudProvider does not support Instances()", node.Name) + glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name) + return + } + err := instances.Configure(node.Name, &node.Spec) + if err != nil { + glog.Errorf("Error configuring node %s: %s", node.Name, err) + // The newly assigned CIDR was not properly configured, so don't save it in the API server. + node.Spec.PodCIDR = "" } - return instances.Configure(node.Name, &node.Spec) } func (nc *NodeController) unassignNodeCIDR(nodeName string) { @@ -196,14 +195,59 @@ func (nc *NodeController) unassignNodeCIDR(nodeName string) { glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName) return } - if err := instances.Release(nodeName); err != nil { + err := instances.Release(nodeName) + if err != nil { glog.Errorf("Error deconfiguring node %s: %s", nodeName, err) } } -// Run starts an asynchronous loop that monitors the status of cluster nodes. +// Run creates initial node list and start syncing instances from cloudprovider, if any. +// It also starts syncing or monitoring cluster node status. +// 1. registerNodes() is called only once to register all initial nodes (from cloudprovider +// or from command line flag). To make cluster bootstrap faster, node controller populates +// node addresses. +// 2. syncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. +// Node created here will only have specs. +// 3. monitorNodeStatus() is called periodically to incorporate the results of node status +// pushed from kubelet to master. func (nc *NodeController) Run(period time.Duration, syncNodeList bool) { - // Incorporate the results of node status pushed from kubelet to master. + // Register intial set of nodes with their status set. + var nodes *api.NodeList + var err error + if nc.isRunningCloudProvider() { + if syncNodeList { + if nodes, err = nc.getCloudNodesWithSpec(); err != nil { + glog.Errorf("Error loading initial node from cloudprovider: %v", err) + } + } else { + nodes = &api.NodeList{} + } + } else { + if nodes, err = nc.getStaticNodesWithSpec(); err != nil { + glog.Errorf("Error loading initial static nodes: %v", err) + } + } + + if nodes, err = nc.populateAddresses(nodes); err != nil { + glog.Errorf("Error getting nodes ips: %v", err) + } + if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs { + nc.reconcilePodCIDRs(nodes, &api.NodeList{}) + } + if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil { + glog.Errorf("Error registering node list %+v: %v", nodes, err) + } + + // Start syncing node list from cloudprovider. + if syncNodeList && nc.isRunningCloudProvider() { + go util.Forever(func() { + if err := nc.syncCloudNodes(); err != nil { + glog.Errorf("Error syncing cloud: %v", err) + } + }, period) + } + + // Start monitoring node status. go util.Forever(func() { if err := nc.monitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) @@ -211,6 +255,165 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) { }, nc.nodeMonitorPeriod) } +// registerNodes registers the given list of nodes, it keeps retrying for `retryCount` times. +func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { + if len(nodes.Items) == 0 { + return nil + } + nodes = nc.canonicalizeName(nodes) + toRegister := util.NewStringSet() + var wg sync.WaitGroup + var successfullyRegistered int32 = 0 + for i := range nodes.Items { + node := &nodes.Items[i] + if !toRegister.Has(node.Name) { + wg.Add(1) + toRegister.Insert(node.Name) + go func(n *api.Node) { + defer wg.Done() + for i := 0; i < retryCount; i++ { + if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs { + nc.configureNodeCIDR(n) + } + _, err := nc.kubeClient.Nodes().Create(n) + if err == nil || apierrors.IsAlreadyExists(err) { + glog.Infof("Registered node in registry: %v", n.Name) + atomic.AddInt32(&successfullyRegistered, 1) + return + } else { + glog.Errorf("Error registering node %v (retries left: %v): %v", n.Name, retryCount-i-1, err) + } + time.Sleep(retryInterval) + } + glog.Errorf("Unable to register node %v", n.Name) + }(node) + } + } + wg.Wait() + if int32(toRegister.Len()) != atomic.LoadInt32(&successfullyRegistered) { + return ErrRegistration + } else { + return nil + } +} + +// syncCloudNodes synchronizes the list of instances from cloudprovider to master server. +func (nc *NodeController) syncCloudNodes() error { + matches, err := nc.getCloudNodesWithSpec() + if err != nil { + return err + } + nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + return err + } + nodeMap := make(map[string]*api.Node) + nodeMapLock := sync.Mutex{} + for i := range nodes.Items { + node := nodes.Items[i] + nodeMapLock.Lock() + nodeMap[node.Name] = &node + nodeMapLock.Unlock() + } + if nc.allocateNodeCIDRs { + nc.reconcilePodCIDRs(matches, nodes) + } + var wg sync.WaitGroup + wg.Add(len(matches.Items)) + // Create nodes which have been created in cloud, but not in kubernetes cluster + // Skip nodes if we hit an error while trying to get their addresses. + for i := range matches.Items { + go func(node *api.Node) { + defer wg.Done() + nodeMapLock.Lock() + _, ok := nodeMap[node.Name] + nodeMapLock.Unlock() + if !ok { + glog.V(3).Infof("Querying addresses for new node: %s", node.Name) + nodeList := &api.NodeList{} + nodeList.Items = []api.Node{*node} + _, err = nc.populateAddresses(nodeList) + if err != nil { + glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err) + return + } + node.Status.Addresses = nodeList.Items[0].Status.Addresses + if nc.allocateNodeCIDRs { + nc.configureNodeCIDR(node) + } + glog.Infof("Create node in registry: %s", node.Name) + _, err = nc.kubeClient.Nodes().Create(node) + if err != nil { + glog.Errorf("Create node %s error: %v", node.Name, err) + } + } + nodeMapLock.Lock() + delete(nodeMap, node.Name) + nodeMapLock.Unlock() + }(&matches.Items[i]) + } + wg.Wait() + + wg.Add(len(nodeMap)) + // Delete nodes which have been deleted from cloud, but not from kubernetes cluster. + for nodeID := range nodeMap { + go func(nodeID string) { + defer wg.Done() + if nc.allocateNodeCIDRs { + nc.unassignNodeCIDR(nodeID) + } + glog.Infof("Delete node from registry: %s", nodeID) + err = nc.kubeClient.Nodes().Delete(nodeID) + if err != nil { + glog.Errorf("Delete node %s error: %v", nodeID, err) + } + nc.deletePods(nodeID) + }(nodeID) + } + wg.Wait() + + return nil +} + +// populateAddresses queries Address for given list of nodes. +func (nc *NodeController) populateAddresses(nodes *api.NodeList) (*api.NodeList, error) { + if nc.isRunningCloudProvider() { + instances, ok := nc.cloud.Instances() + if !ok { + return nodes, ErrCloudInstance + } + for i := range nodes.Items { + node := &nodes.Items[i] + nodeAddresses, err := instances.NodeAddresses(node.Name) + if err != nil { + glog.Errorf("error getting instance addresses for %s: %v", node.Name, err) + } else { + node.Status.Addresses = nodeAddresses + } + } + } else { + for i := range nodes.Items { + node := &nodes.Items[i] + addr := net.ParseIP(node.Name) + if addr != nil { + address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} + node.Status.Addresses = []api.NodeAddress{address} + } else { + addrs, err := nc.lookupIP(node.Name) + if err != nil { + glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) + } else if len(addrs) == 0 { + glog.Errorf("No ip address for node %v", node.Name) + } else { + address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()} + node.Status.Addresses = []api.NodeAddress{address} + } + } + } + } + return nodes, nil +} + func (nc *NodeController) recordNodeEvent(node *api.Node, event string) { ref := &api.ObjectReference{ Kind: "Node", @@ -364,9 +567,6 @@ func (nc *NodeController) monitorNodeStatus() error { if err != nil { return err } - if nc.allocateNodeCIDRs { - nc.reconcilePodCIDRs(nodes) - } for i := range nodes.Items { var gracePeriod time.Duration var lastReadyCondition api.NodeCondition @@ -395,12 +595,10 @@ func (nc *NodeController) monitorNodeStatus() error { if lastReadyCondition.Status == api.ConditionFalse && nc.now().After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. - // Makes sure we are not removing pods from too many nodes in the same time. + // Makes sure we are not removing pods from to many nodes in the same time. glog.Infof("Evicting pods: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) if nc.deletingPodsRateLimiter.CanAccept() { - if err := nc.deletePods(node.Name); err != nil { - glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) - } + nc.deletePods(node.Name) } } if lastReadyCondition.Status == api.ConditionUnknown && @@ -409,9 +607,7 @@ func (nc *NodeController) monitorNodeStatus() error { // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. glog.Infof("Evicting pods2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) if nc.deletingPodsRateLimiter.CanAccept() { - if err := nc.deletePods(node.Name); err != nil { - glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) - } + nc.deletePods(node.Name) } } @@ -425,33 +621,71 @@ func (nc *NodeController) monitorNodeStatus() error { if readyCondition.Status == api.ConditionUnknown && lastReadyCondition.Status != api.ConditionUnknown { nc.recordNodeEvent(node, "unknown") } - - // Check with the cloud provider to see if the node still exists. If it - // doesn't, delete the node and all pods scheduled on the node. - if readyCondition.Status != api.ConditionTrue && nc.cloud != nil { - instances, ok := nc.cloud.Instances() - if !ok { - glog.Errorf("%v", ErrCloudInstance) - continue - } - if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound { - if nc.allocateNodeCIDRs { - nc.unassignNodeCIDR(node.Name) - } - if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil { - glog.Errorf("Unable to delete node %s: %v", node.Name, err) - continue - } - if err := nc.deletePods(node.Name); err != nil { - glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) - } - } - } } } return nil } +// getStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. +func (nc *NodeController) getStaticNodesWithSpec() (*api.NodeList, error) { + result := &api.NodeList{} + for _, nodeID := range nc.nodes { + node := api.Node{ + ObjectMeta: api.ObjectMeta{Name: nodeID}, + Spec: api.NodeSpec{ + ExternalID: nodeID, + }, + Status: api.NodeStatus{ + Capacity: nc.staticResources.Capacity, + }, + } + result.Items = append(result.Items, node) + } + return result, nil +} + +// getCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. +func (nc *NodeController) getCloudNodesWithSpec() (*api.NodeList, error) { + result := &api.NodeList{} + instances, ok := nc.cloud.Instances() + if !ok { + return result, ErrCloudInstance + } + matches, err := instances.List(nc.matchRE) + if err != nil { + return result, err + } + for i := range matches { + node := api.Node{} + node.Name = matches[i] + resources, err := instances.GetNodeResources(matches[i]) + if err != nil { + return nil, err + } + if resources == nil { + resources = nc.staticResources + } + if resources != nil { + node.Status.Capacity = resources.Capacity + if node.Status.Capacity != nil { + node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(0, resource.DecimalSI) + } + } + instanceID, err := instances.ExternalID(node.Name) + if err != nil { + glog.Errorf("Error getting instance id for %s: %v", node.Name, err) + } else { + node.Spec.ExternalID = instanceID + } + result.Items = append(result.Items, node) + } + return result, nil +} + // deletePods will delete all pods from master running on given node. func (nc *NodeController) deletePods(nodeID string) error { glog.V(2).Infof("Delete all pods from %v", nodeID) @@ -474,6 +708,19 @@ func (nc *NodeController) deletePods(nodeID string) error { return nil } +// isRunningCloudProvider checks if cluster is running with cloud provider. +func (nc *NodeController) isRunningCloudProvider() bool { + return nc.cloud != nil && len(nc.matchRE) > 0 +} + +// canonicalizeName takes a node list and lowercases all nodes' name. +func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { + for i := range nodes.Items { + nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name) + } + return nodes +} + // getCondition returns a condition object for the specific condition // type, nil if the condition is not set. func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition { diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go index d9ec66c84e2..34e5b22fb54 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go @@ -19,6 +19,7 @@ package nodecontroller import ( "errors" "fmt" + "reflect" "sort" "sync" "testing" @@ -29,6 +30,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -140,6 +142,506 @@ func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, re return nil, nil } +func TestRegisterNodes(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + machines []string + retryCount int + expectedRequestCount int + expectedCreateCount int + expectedFail bool + }{ + { + // Register two nodes normally. + machines: []string{"node0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true }, + }, + retryCount: 1, + expectedRequestCount: 2, + expectedCreateCount: 2, + expectedFail: false, + }, + { + // Canonicalize node names. + machines: []string{"NODE0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { + if node.Name == "NODE0" { + return false + } + return true + }, + }, + retryCount: 1, + expectedRequestCount: 2, + expectedCreateCount: 2, + expectedFail: false, + }, + { + // No machine to register. + machines: []string{}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true }, + }, + retryCount: 1, + expectedRequestCount: 0, + expectedCreateCount: 0, + expectedFail: false, + }, + { + // Fail the first two requests. + machines: []string{"node0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { + if fake.RequestCount == 0 || fake.RequestCount == 1 { + return false + } + return true + }, + }, + retryCount: 10, + expectedRequestCount: 4, + expectedCreateCount: 2, + expectedFail: false, + }, + { + // One node already exists + machines: []string{"node0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node1", + }, + }, + }, + }, + retryCount: 10, + expectedRequestCount: 2, + expectedCreateCount: 1, + expectedFail: false, + }, + { + // The first node always fails. + machines: []string{"node0", "node1"}, + fakeNodeHandler: &FakeNodeHandler{ + CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { + if node.Name == "node0" { + return false + } + return true + }, + }, + retryCount: 2, + expectedRequestCount: 3, // 2 for node0, 1 for node1 + expectedCreateCount: 1, + expectedFail: true, + }, + } + + for _, item := range table { + nodes := api.NodeList{} + for _, machine := range item.machines { + nodes.Items = append(nodes.Items, *newNode(machine)) + } + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond) + if !item.expectedFail && err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.expectedFail && err == nil { + t.Errorf("unexpected non-error") + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v calls, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + if len(item.fakeNodeHandler.CreatedNodes) != item.expectedCreateCount { + t.Errorf("expected %v nodes, but got %v.", item.expectedCreateCount, item.fakeNodeHandler.CreatedNodes) + } + } +} + +func TestCreateGetStaticNodesWithSpec(t *testing.T) { + table := []struct { + machines []string + expectedNodes *api.NodeList + }{ + { + machines: []string{}, + expectedNodes: &api.NodeList{}, + }, + { + machines: []string{"node0"}, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{ + ExternalID: "node0", + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + }, + }, + }, + }, + }, + { + machines: []string{"node0", "node1"}, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{ + ExternalID: "node0", + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Spec: api.NodeSpec{ + ExternalID: "node1", + }, + Status: api.NodeStatus{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + }, + }, + }, + }, + }, + } + + resources := api.NodeResources{ + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + } + for _, item := range table { + nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute, + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + nodes, err := nodeController.getStaticNodesWithSpec() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedNodes, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes) + } + } +} + +func TestCreateGetCloudNodesWithSpec(t *testing.T) { + resourceList := api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(1000, resource.DecimalSI), + api.ResourceMemory: *resource.NewQuantity(3000, resource.DecimalSI), + } + + table := []struct { + fakeCloud *fake_cloud.FakeCloud + machines []string + expectedNodes *api.NodeList + }{ + { + fakeCloud: &fake_cloud.FakeCloud{}, + expectedNodes: &api.NodeList{}, + }, + { + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + NodeResources: &api.NodeResources{Capacity: resourceList}, + }, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{Capacity: resourceList}, + }, + }, + }, + }, + { + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + NodeResources: &api.NodeResources{Capacity: resourceList}, + }, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{Capacity: resourceList}, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Status: api.NodeStatus{Capacity: resourceList}, + }, + }, + }, + }, + } + + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute, + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + nodes, err := nodeController.getCloudNodesWithSpec() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedNodes, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes) + } + } +} + +func TestSyncCloudNodes(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeCloud *fake_cloud.FakeCloud + matchRE string + expectedRequestCount int + expectedNameCreated []string + expectedExtIDCreated []string + expectedAddrsCreated []string + expectedDeleted []string + }{ + { + // 1 existing node, 1 cloud nodes: do nothing. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + ExtID: map[string]string{ + "node0": "ext-node0", + "node1": "ext-node1", + }, + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + }, + matchRE: ".*", + expectedRequestCount: 1, // List + expectedNameCreated: []string{}, + expectedExtIDCreated: []string{}, + expectedAddrsCreated: []string{}, + expectedDeleted: []string{}, + }, + { + // 1 existing node, 2 cloud nodes: create 1. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + ExtID: map[string]string{ + "node0": "ext-node0", + "node1": "ext-node1", + }, + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Create + expectedNameCreated: []string{"node1"}, + expectedExtIDCreated: []string{"ext-node1"}, + expectedAddrsCreated: []string{"1.2.3.4"}, + expectedDeleted: []string{}, + }, + { + // 2 existing nodes, 1 cloud node: delete 1. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + ExtID: map[string]string{ + "node0": "ext-node0", + "node1": "ext-node1", + }, + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Delete + expectedNameCreated: []string{}, + expectedExtIDCreated: []string{}, + expectedAddrsCreated: []string{}, + expectedDeleted: []string{"node1"}, + }, + { + // 1 existing node, 3 cloud nodes but only 2 match regex: delete 1. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1", "fake"}, + ExtID: map[string]string{ + "node0": "ext-node0", + "node1": "ext-node1", + "fake": "ext-fake", + }, + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + }, + matchRE: "node[0-9]+", + expectedRequestCount: 2, // List + Create + expectedNameCreated: []string{"node1"}, + expectedExtIDCreated: []string{"ext-node1"}, + expectedAddrsCreated: []string{"1.2.3.4"}, + expectedDeleted: []string{}, + }, + } + + for _, item := range table { + if item.fakeNodeHandler.Fake == nil { + item.fakeNodeHandler.Fake = testclient.NewSimpleFake() + } + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + if err := nodeController.syncCloudNodes(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + nodes := sortedNodeNames(item.fakeNodeHandler.CreatedNodes) + if !reflect.DeepEqual(item.expectedNameCreated, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedNameCreated, nodes) + } + nodeExtIDs := sortedNodeExternalIDs(item.fakeNodeHandler.CreatedNodes) + if !reflect.DeepEqual(item.expectedExtIDCreated, nodeExtIDs) { + t.Errorf("expected node external id list %+v, got %+v", item.expectedExtIDCreated, nodeExtIDs) + } + nodeAddrs := sortedNodeAddresses(item.fakeNodeHandler.CreatedNodes) + if !reflect.DeepEqual(item.expectedAddrsCreated, nodeAddrs) { + t.Errorf("expected node address list %+v, got %+v", item.expectedAddrsCreated, nodeAddrs) + } + nodes = sortedNodeNames(item.fakeNodeHandler.DeletedNodes) + if !reflect.DeepEqual(item.expectedDeleted, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes) + } + } +} + +func TestSyncCloudNodesEvictPods(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeCloud *fake_cloud.FakeCloud + matchRE string + expectedRequestCount int + expectedDeleted []string + expectedActions []testclient.FakeAction + }{ + { + // No node to delete: do nothing. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + }, + matchRE: ".*", + expectedRequestCount: 1, // List + expectedDeleted: []string{}, + expectedActions: nil, + }, + { + // Delete node1, and pod0 is running on it. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node1")}}), + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Delete + expectedDeleted: []string{"node1"}, + expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, + }, + { + // Delete node1, but pod0 is running on node0. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Delete + expectedDeleted: []string{"node1"}, + expectedActions: []testclient.FakeAction{{Action: "list-pods"}}, + }, + } + + for _, item := range table { + if item.fakeNodeHandler.Fake == nil { + item.fakeNodeHandler.Fake = testclient.NewSimpleFake() + } + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + if err := nodeController.syncCloudNodes(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + nodes := sortedNodeNames(item.fakeNodeHandler.DeletedNodes) + if !reflect.DeepEqual(item.expectedDeleted, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes) + } + if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { + t.Errorf("time out waiting for deleting pods, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) + } + } +} + +func TestPopulateNodeAddresses(t *testing.T) { + table := []struct { + nodes *api.NodeList + fakeCloud *fake_cloud.FakeCloud + expectedFail bool + expectedAddresses []api.NodeAddress + }{ + { + nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}}, + fakeCloud: &fake_cloud.FakeCloud{Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}}, + expectedAddresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + }, + }, + { + nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}}, + fakeCloud: &fake_cloud.FakeCloud{Err: ErrQueryIPAddress}, + expectedAddresses: nil, + }, + } + + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute, + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + result, err := nodeController.populateAddresses(item.nodes) + // In case of IP querying error, we should continue. + if err != nil { + t.Errorf("unexpected error: %v", err) + } + for _, node := range result.Items { + if !reflect.DeepEqual(item.expectedAddresses, node.Status.Addresses) { + t.Errorf("expect HostIP %s, got %s", item.expectedAddresses, node.Status.Addresses) + } + } + } +} + func TestMonitorNodeStatusEvictPods(t *testing.T) { fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) evictionTimeout := 10 * time.Minute @@ -324,7 +826,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", nil, item.fakeNodeHandler, 10, + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } @@ -527,7 +1029,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d68d15a7d5c..6cb89206e5c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -31,7 +31,6 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -67,6 +66,11 @@ const ( // Max amount of time to wait for the container runtime to come up. maxWaitForContainerRuntime = 5 * time.Minute + // Initial node status update frequency and incremental frequency, for faster cluster startup. + // The update frequency will be increameted linearly, until it reaches status_update_frequency. + initialNodeStatusUpdateFrequency = 100 * time.Millisecond + nodeStatusUpdateFrequencyInc = 500 * time.Millisecond + // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5 @@ -118,7 +122,6 @@ func NewMainKubelet( pullBurst int, containerGCPolicy ContainerGCPolicy, sourcesReady SourcesReadyFn, - registerNode bool, clusterDomain string, clusterDNS net.IP, masterServiceNamespace string, @@ -221,7 +224,6 @@ func NewMainKubelet( readinessManager: readinessManager, httpClient: &http.Client{}, sourcesReady: sourcesReady, - registerNode: registerNode, clusterDomain: clusterDomain, clusterDNS: clusterDNS, serviceLister: serviceLister, @@ -371,9 +373,6 @@ type Kubelet struct { // cAdvisor used for container information. cadvisor cadvisor.Interface - // Set to true to have the node register itself with the apiserver. - registerNode bool - // If non-empty, use this for container DNS search. clusterDomain string @@ -658,22 +657,26 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { glog.Infof("Running in container %q", kl.resourceContainer) } - if err := kl.imageManager.Start(); err != nil { + err := kl.imageManager.Start() + if err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ImageManager %v", err) glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err) } - if err := kl.cadvisor.Start(); err != nil { + err = kl.cadvisor.Start() + if err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start CAdvisor %v", err) glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err) } - if err := kl.containerManager.Start(); err != nil { + err = kl.containerManager.Start() + if err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ContainerManager %v", err) glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err) } - if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { + err = kl.oomWatcher.Start(kl.nodeRef) + if err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start OOM watcher %v", err) glog.Errorf("Failed to start OOM watching: %v", err) } @@ -685,83 +688,20 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } -func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { - node := &api.Node{ - ObjectMeta: api.ObjectMeta{ - Name: kl.hostname, - Labels: map[string]string{"kubernetes.io/hostname": kl.hostname}, - }, - } - if kl.cloud != nil { - instances, ok := kl.cloud.Instances() - if !ok { - return nil, fmt.Errorf("failed to get instances from cloud provider") - } - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - instanceID, err := instances.ExternalID(kl.hostname) - if err != nil { - return nil, fmt.Errorf("failed to get instance ID from cloud provider: %v", err) - } - node.Spec.ExternalID = instanceID - } else { - node.Spec.ExternalID = kl.hostname - } - if err := kl.setNodeStatus(node); err != nil { - return nil, err - } - return node, nil -} - -// registerWithApiserver registers the node with the cluster master. -func (kl *Kubelet) registerWithApiserver() { - step := 100 * time.Millisecond - for { - time.Sleep(step) - step = step * 2 - if step >= 7*time.Second { - step = 7 * time.Second - } - - node, err := kl.initialNodeStatus() - if err != nil { - glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) - continue - } - glog.V(2).Infof("Attempting to register node %s", node.Name) - if _, err := kl.kubeClient.Nodes().Create(node); err != nil { - if apierrors.IsAlreadyExists(err) { - currentNode, err := kl.kubeClient.Nodes().Get(kl.hostname) - if err != nil { - glog.Errorf("error getting node %q: %v", kl.hostname, err) - continue - } - if currentNode == nil { - glog.Errorf("no node instance returned for %q", kl.hostname) - continue - } - if currentNode.Spec.ExternalID == node.Spec.ExternalID { - glog.Infof("Node %s was previously registered", node.Name) - return - } - } - glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) - continue - } - glog.Infof("Successfully registered node %s", node.Name) - return - } -} - // syncNodeStatus periodically synchronizes node status to master. func (kl *Kubelet) syncNodeStatus() { if kl.kubeClient == nil { return } - if kl.registerNode { - kl.registerWithApiserver() - } glog.Infof("Starting node status updates") + for feq := initialNodeStatusUpdateFrequency; feq < kl.nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { + select { + case <-time.After(feq): + if err := kl.updateNodeStatus(); err != nil { + glog.Errorf("Unable to update node status: %v", err) + } + } + } for { select { case <-time.After(kl.nodeStatusUpdateFrequency): @@ -1767,13 +1707,14 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error { // updateNodeStatus updates node status to master with retries. func (kl *Kubelet) updateNodeStatus() error { for i := 0; i < nodeStatusUpdateRetry; i++ { - if err := kl.tryUpdateNodeStatus(); err != nil { - glog.Errorf("Error updating node status, will retry: %v", err) + err := kl.tryUpdateNodeStatus() + if err != nil { + glog.Errorf("error updating node status, will retry: %v", err) } else { return nil } } - return fmt.Errorf("update node status exceeds retry count") + return fmt.Errorf("Update node status exceeds retry count") } func (kl *Kubelet) recordNodeOnlineEvent() { @@ -1797,36 +1738,15 @@ func (kl *Kubelet) recordNodeUnschedulableEvent() { // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() var oldNodeUnschedulable bool -// setNodeStatus fills in the Status fields of the given Node, overwriting -// any fields that are currently set. -func (kl *Kubelet) setNodeStatus(node *api.Node) error { - // Set addresses for the node. - if kl.cloud != nil { - instances, ok := kl.cloud.Instances() - if !ok { - return fmt.Errorf("failed to get instances from cloud provider") - } - // TODO(roberthbailey): Can we do this without having credentials to talk - // to the cloud provider? - nodeAddresses, err := instances.NodeAddresses(kl.hostname) - if err != nil { - return fmt.Errorf("failed to get node address from cloud provider: %v", err) - } - node.Status.Addresses = nodeAddresses - } else { - addr := net.ParseIP(kl.hostname) - if addr != nil { - node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addr.String()}} - } else { - addrs, err := net.LookupIP(node.Name) - if err != nil { - return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) - } else if len(addrs) == 0 { - return fmt.Errorf("no ip address for node %v", node.Name) - } else { - node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}} - } - } +// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 +// is set, this function will also confirm that cbr0 is configured correctly. +func (kl *Kubelet) tryUpdateNodeStatus() error { + node, err := kl.kubeClient.Nodes().Get(kl.hostname) + if err != nil { + return fmt.Errorf("error getting node %q: %v", kl.hostname, err) + } + if node == nil { + return fmt.Errorf("no node instance returned for %q", kl.hostname) } networkConfigured := true @@ -1841,13 +1761,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. info, err := kl.GetCachedMachineInfo() if err != nil { - // TODO(roberthbailey): This is required for test-cmd.sh to pass. - // See if the test should be updated instead. - node.Status.Capacity = api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), - api.ResourceMemory: resource.MustParse("0Gi"), - } - glog.Errorf("Error getting machine info: %v", err) + glog.Errorf("error getting machine info: %v", err) } else { node.Status.NodeInfo.MachineID = info.MachineID node.Status.NodeInfo.SystemUUID = info.SystemUUID @@ -1866,7 +1780,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { verinfo, err := kl.cadvisor.VersionInfo() if err != nil { - glog.Errorf("Error getting version info: %v", err) + glog.Errorf("error getting version info: %v", err) } else { node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion @@ -1934,22 +1848,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { } oldNodeUnschedulable = node.Spec.Unschedulable } - return nil -} -// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 -// is set, this function will also confirm that cbr0 is configured correctly. -func (kl *Kubelet) tryUpdateNodeStatus() error { - node, err := kl.kubeClient.Nodes().Get(kl.hostname) - if err != nil { - return fmt.Errorf("error getting node %q: %v", kl.hostname, err) - } - if node == nil { - return fmt.Errorf("no node instance returned for %q", kl.hostname) - } - if err := kl.setNodeStatus(node); err != nil { - return err - } // Update the current status on the API server _, err = kl.kubeClient.Nodes().UpdateStatus(node) return err diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7e6713a95a0..c7f0a733b8b 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -34,7 +34,6 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" @@ -45,7 +44,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" @@ -3241,7 +3239,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}}, + {ObjectMeta: api.ObjectMeta{Name: "testnode"}}, }}).ReactFn machineInfo := &cadvisorApi.MachineInfo{ MachineID: "123", @@ -3259,7 +3257,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, + ObjectMeta: api.ObjectMeta{Name: "testnode"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3286,7 +3284,6 @@ func TestUpdateNewNodeStatus(t *testing.T) { api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}}, }, } @@ -3320,7 +3317,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { kubeClient := testKubelet.fakeKubeClient kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ { - ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, + ObjectMeta: api.ObjectMeta{Name: "testnode"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3356,7 +3353,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, + ObjectMeta: api.ObjectMeta{Name: "testnode"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3383,7 +3380,6 @@ func TestUpdateExistingNodeStatus(t *testing.T) { api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}}, }, } @@ -3423,7 +3419,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { fakeDocker.VersionInfo = []string{} kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}}, + {ObjectMeta: api.ObjectMeta{Name: "testnode"}}, }}).ReactFn mockCadvisor := testKubelet.fakeCadvisor machineInfo := &cadvisorApi.MachineInfo{ @@ -3442,7 +3438,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { mockCadvisor.On("VersionInfo").Return(versionInfo, nil) expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, + ObjectMeta: api.ObjectMeta{Name: "testnode"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3469,7 +3465,6 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}}, }, } @@ -4407,62 +4402,6 @@ func TestFilterOutTerminatedPods(t *testing.T) { } } -func TestRegisterExistingNodeWithApiserver(t *testing.T) { - testKubelet := newTestKubelet(t) - kubelet := testKubelet.kubelet - kubelet.hostname = "127.0.0.1" - kubeClient := testKubelet.fakeKubeClient - kubeClient.ReactFn = func(action testclient.FakeAction) (runtime.Object, error) { - segments := strings.Split(action.Action, "-") - if len(segments) < 2 { - return nil, fmt.Errorf("unrecognized action, need two or three segments - or --: %s", action.Action) - } - verb := segments[0] - switch verb { - case "create": - // Return an error on create. - return &api.Node{}, &apierrors.StatusError{ - ErrStatus: api.Status{Reason: api.StatusReasonAlreadyExists}, - } - case "get": - // Return an existing (matching) node on get. - return &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, - Spec: api.NodeSpec{ExternalID: "127.0.0.1"}, - }, nil - default: - return nil, fmt.Errorf("no reaction implemented for %s", action.Action) - } - } - machineInfo := &cadvisorApi.MachineInfo{ - MachineID: "123", - SystemUUID: "abc", - BootID: "1b3", - NumCores: 2, - MemoryCapacity: 1024, - } - mockCadvisor := testKubelet.fakeCadvisor - mockCadvisor.On("MachineInfo").Return(machineInfo, nil) - versionInfo := &cadvisorApi.VersionInfo{ - KernelVersion: "3.16.0-0.bpo.4-amd64", - ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", - DockerVersion: "1.5.0", - } - mockCadvisor.On("VersionInfo").Return(versionInfo, nil) - - done := make(chan struct{}) - go func() { - kubelet.registerWithApiserver() - done <- struct{}{} - }() - select { - case <-time.After(5 * time.Second): - t.Errorf("timed out waiting for registration") - case <-done: - return - } -} - func TestMakePortMappings(t *testing.T) { tests := []struct { container *api.Container