diff --git a/cluster/saltbase/salt/kubelet/default b/cluster/saltbase/salt/kubelet/default index c39b050ecd8..a1571bd0319 100644 --- a/cluster/saltbase/salt/kubelet/default +++ b/cluster/saltbase/salt/kubelet/default @@ -22,6 +22,15 @@ {% set api_servers_with_port = api_servers + ":6443" -%} {% endif -%} +# Disable registration for the kubelet running on the master on GCE. +# TODO(roberthbailey): Make this configurable via an env var in config-default.sh +{% if grains.cloud == 'gce' -%} + {% if grains['roles'][0] == 'kubernetes-master' -%} + {% set api_servers_with_port = "" -%} + {% endif -%} +{% endif -%} + + {% set config = "--config=/etc/kubernetes/manifests" -%} {% set hostname_override = "" -%} {% if grains.hostname_override is defined -%} diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 7d8c79f9f64..cc6fda5f062 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -101,7 +101,6 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st // Setup servers := []string{} glog.Infof("Creating etcd client pointing to %v", servers) - machineList := []string{"localhost", "127.0.0.1"} handler := delegateHandler{} apiServer := httptest.NewServer(&handler) @@ -196,7 +195,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), }} - nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := nodecontroller.NewNodeController(nil, "", nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5*time.Second, true) cadvisorInterface := new(cadvisor.Fake) @@ -206,7 +205,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st configFilePath := makeTempDirOrDie("config", testRootDir) glog.Infof("Using %s as root dir for kubelet #1", testRootDir) fakeDocker1.VersionInfo = docker.Env{"ApiVersion=1.15"} - kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, machineList[0], testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{}) + kcfg := kubeletapp.SimpleKubelet(cl, &fakeDocker1, "localhost", testRootDir, firstManifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath, nil, kubecontainer.FakeOS{}) kubeletapp.RunKubelet(kcfg, nil) // Kubelet (machine) // Create a second kubelet so that the guestbook example's two redis slaves both @@ -214,7 +213,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st testRootDir = makeTempDirOrDie("kubelet_integ_2.", "") glog.Infof("Using %s as root dir for kubelet #2", testRootDir) fakeDocker2.VersionInfo = docker.Env{"ApiVersion=1.15"} - kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, machineList[1], testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{}) + kcfg = kubeletapp.SimpleKubelet(cl, &fakeDocker2, "127.0.0.1", testRootDir, secondManifestURL, "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.FakeOS{}) kubeletapp.RunKubelet(kcfg, nil) return apiServer.URL, configFilePath } diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index c9a7276f523..11278bd084f 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -228,7 +228,7 @@ func (s *CMServer) Run(_ []string) error { glog.Warning("DEPRECATION NOTICE: sync-node-status flag is being deprecated. It has no effect now and it will be removed in a future version.") } - nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, + nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, nodeResources, kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 668b1b45aa0..817d1b7fc75 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -89,6 +89,7 @@ type KubeletServer struct { HealthzBindAddress util.IP OOMScoreAdj int APIServerList util.StringList + RegisterNode bool ClusterDomain string MasterServiceNamespace string ClusterDNS util.IP @@ -155,6 +156,7 @@ func NewKubeletServer() *KubeletServer { CadvisorPort: 4194, HealthzPort: 10248, HealthzBindAddress: util.IP(net.ParseIP("127.0.0.1")), + RegisterNode: true, // will be ignored if no apiserver is configured OOMScoreAdj: -900, MasterServiceNamespace: api.NamespaceDefault, ImageGCHighThresholdPercent: 90, @@ -211,6 +213,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.Var(&s.HealthzBindAddress, "healthz-bind-address", "The IP address for the healthz server to serve on, defaulting to 127.0.0.1 (set to 0.0.0.0 for all interfaces)") fs.IntVar(&s.OOMScoreAdj, "oom-score-adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]") fs.Var(&s.APIServerList, "api-servers", "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.") + fs.BoolVar(&s.RegisterNode, "register-node", s.RegisterNode, "Register the node with the apiserver (defaults to true if --api-server is set)") fs.StringVar(&s.ClusterDomain, "cluster-domain", s.ClusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") fs.StringVar(&s.MasterServiceNamespace, "master-service-namespace", s.MasterServiceNamespace, "The namespace from which the kubernetes master services should be injected into pods") fs.Var(&s.ClusterDNS, "cluster-dns", "IP address for a cluster DNS server. If set, kubelet will configure all containers to use this for DNS resolution in addition to the host's DNS servers") @@ -318,6 +321,7 @@ func (s *KubeletServer) Run(_ []string) error { MinimumGCAge: s.MinimumGCAge, MaxPerPodContainerCount: s.MaxPerPodContainerCount, MaxContainerCount: s.MaxContainerCount, + RegisterNode: s.RegisterNode, ClusterDomain: s.ClusterDomain, ClusterDNS: s.ClusterDNS, Runonce: s.RunOnce, @@ -493,6 +497,7 @@ func SimpleKubelet(client *client.Client, MinimumGCAge: 10 * time.Second, MaxPerPodContainerCount: 5, MaxContainerCount: 100, + RegisterNode: true, MasterServiceNamespace: masterServiceNamespace, VolumePlugins: volumePlugins, TLSOptions: tlsOptions, @@ -618,6 +623,7 @@ type KubeletConfig struct { MinimumGCAge time.Duration MaxPerPodContainerCount int MaxContainerCount int + RegisterNode bool ClusterDomain string ClusterDNS util.IP EnableServer bool @@ -675,6 +681,7 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.RegistryBurst, gcPolicy, pc.SeenAllSources, + kc.RegisterNode, kc.ClusterDomain, net.IP(kc.ClusterDNS), kc.MasterServiceNamespace, diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index aa8b746b4ec..1f0e7d71b7b 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -123,7 +123,7 @@ func runScheduler(cl *client.Client) { } // RunControllerManager starts a controller -func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, nodeMemory int64) { +func runControllerManager(cl *client.Client, nodeMilliCPU, nodeMemory int64) { nodeResources := &api.NodeResources{ Capacity: api.ResourceList{ api.ResourceCPU: *resource.NewMilliQuantity(nodeMilliCPU, resource.DecimalSI), @@ -133,7 +133,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, const nodeSyncPeriod = 10 * time.Second nodeController := nodecontroller.NewNodeController( - nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), + nil, "", nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(nodeSyncPeriod, true) @@ -150,18 +150,16 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, } func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP, port int) { - machineList := []string{"localhost"} - runApiServer(etcdClient, addr, port, *masterServiceNamespace) runScheduler(cl) - runControllerManager(machineList, cl, *nodeMilliCPU, *nodeMemory) + runControllerManager(cl, *nodeMilliCPU, *nodeMemory) dockerClient := dockertools.ConnectToDockerOrDie(*dockerEndpoint) cadvisorInterface, err := cadvisor.New(0) if err != nil { glog.Fatalf("Failed to create cAdvisor: %v", err) } - kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.RealOS{}) + kcfg := kubeletapp.SimpleKubelet(cl, dockerClient, "localhost", "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "", nil, kubecontainer.RealOS{}) kubeletapp.RunKubelet(kcfg, nil) } diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index bc47d2be2ee..392f951bfbb 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -1195,6 +1195,8 @@ func ValidateNodeUpdate(oldNode *api.Node, node *api.Node) errs.ValidationErrorL oldNode.ObjectMeta = node.ObjectMeta // Allow users to update capacity oldNode.Status.Capacity = node.Status.Capacity + // Allow the controller manager to assign a CIDR to a node. + oldNode.Spec.PodCIDR = node.Spec.PodCIDR // Allow users to unschedule node oldNode.Spec.Unschedulable = node.Spec.Unschedulable // Clear status diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 59418aeb40d..f62b31ff6c1 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -17,6 +17,7 @@ limitations under the License. package cloudprovider import ( + "errors" "net" "strings" @@ -86,6 +87,8 @@ type Instances interface { Release(name string) error } +var InstanceNotFound = errors.New("instance not found") + // Zone represents the location of a particular machine. type Zone struct { FailureDomain string diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index f63569d7462..08f7a090ab9 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -444,7 +444,10 @@ func (gce *GCECloud) getInstanceByName(name string) (*compute.Instance, error) { name = canonicalizeInstanceName(name) res, err := gce.service.Instances.Get(gce.projectID, gce.zone, name).Do() if err != nil { - glog.Errorf("Failed to retrieve TargetInstance resource for instance:%s", name) + glog.Errorf("Failed to retrieve TargetInstance resource for instance: %s", name) + if apiErr, ok := err.(*googleapi.Error); ok && apiErr.Code == http.StatusNotFound { + return nil, cloudprovider.InstanceNotFound + } return nil, err } return res, nil diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index 55510ebfb26..e9aa54e0f89 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -20,14 +20,9 @@ import ( "errors" "fmt" "net" - "strings" - "sync" - "sync/atomic" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" @@ -57,7 +52,6 @@ type NodeController struct { cloud cloudprovider.Interface matchRE string staticResources *api.NodeResources - nodes []string kubeClient client.Interface recorder record.EventRecorder registerRetryCount int @@ -100,7 +94,6 @@ type NodeController struct { func NewNodeController( cloud cloudprovider.Interface, matchRE string, - nodes []string, staticResources *api.NodeResources, kubeClient client.Interface, registerRetryCount int, @@ -125,7 +118,6 @@ func NewNodeController( return &NodeController{ cloud: cloud, matchRE: matchRE, - nodes: nodes, staticResources: staticResources, kubeClient: kubeClient, recorder: recorder, @@ -144,9 +136,9 @@ func NewNodeController( } // Generates num pod CIDRs that could be assigned to nodes. -func (nc *NodeController) generateCIDRs(num int) util.StringSet { +func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet { res := util.NewStringSet() - cidrIP := nc.clusterCIDR.IP.To4() + cidrIP := clusterCIDR.IP.To4() for i := 0; i < num; i++ { // TODO: Make the CIDRs configurable. b1 := byte(i >> 8) @@ -156,37 +148,46 @@ func (nc *NodeController) generateCIDRs(num int) util.StringSet { return res } -// For each node from newNodes, finds its current spec in registeredNodes. -// If it is not there, it gets a new valid CIDR assigned. -func (nc *NodeController) reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList { - registeredCIDRs := make(map[string]string) - availableCIDRs := nc.generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items)) - for _, node := range registeredNodes.Items { - registeredCIDRs[node.Name] = node.Spec.PodCIDR - availableCIDRs.Delete(node.Spec.PodCIDR) - } - for i, node := range newNodes.Items { - podCIDR, registered := registeredCIDRs[node.Name] - if !registered { - podCIDR, _ = availableCIDRs.PopAny() +// reconcilePodCIDRs looks at each node and assigns it a valid CIDR +// if it doesn't currently have one. +func (nc *NodeController) reconcilePodCIDRs(nodes *api.NodeList) { + glog.V(4).Infof("Reconciling pods cidrs for %d nodes", len(nodes.Items)) + // TODO(roberthbailey): This seems inefficient. Why re-calculate CIDRs + // on each sync period? + availableCIDRs := generateCIDRs(nc.clusterCIDR, len(nodes.Items)) + for _, node := range nodes.Items { + if node.Spec.PodCIDR != "" { + glog.V(4).Infof("CIDR %s is already being used by node %s", node.Spec.PodCIDR, node.Name) + availableCIDRs.Delete(node.Spec.PodCIDR) + } + } + for _, node := range nodes.Items { + if node.Spec.PodCIDR == "" { + podCIDR, found := availableCIDRs.PopAny() + if !found { + glog.Errorf("No available CIDR for node %s", node.Name) + continue + } + glog.V(4).Infof("Assigning node %s CIDR %s", node.Name, podCIDR) + node.Spec.PodCIDR = podCIDR + if err := nc.configureNodeCIDR(&node); err != nil { + glog.Errorf("Error configuring node %s: %s", node.Name, err) + // The newly assigned CIDR was not properly configured, so don't save it in the API server. + continue + } + if _, err := nc.kubeClient.Nodes().Update(&node); err != nil { + glog.Errorf("Unable to assign node %s CIDR %s: %v", node.Name, podCIDR, err) + } } - newNodes.Items[i].Spec.PodCIDR = podCIDR } - return newNodes } -func (nc *NodeController) configureNodeCIDR(node *api.Node) { +func (nc *NodeController) configureNodeCIDR(node *api.Node) error { instances, ok := nc.cloud.Instances() if !ok { - glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name) - return - } - err := instances.Configure(node.Name, &node.Spec) - if err != nil { - glog.Errorf("Error configuring node %s: %s", node.Name, err) - // The newly assigned CIDR was not properly configured, so don't save it in the API server. - node.Spec.PodCIDR = "" + return fmt.Errorf("error configuring node %s: CloudProvider does not support Instances()", node.Name) } + return instances.Configure(node.Name, &node.Spec) } func (nc *NodeController) unassignNodeCIDR(nodeName string) { @@ -195,59 +196,14 @@ func (nc *NodeController) unassignNodeCIDR(nodeName string) { glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName) return } - err := instances.Release(nodeName) - if err != nil { + if err := instances.Release(nodeName); err != nil { glog.Errorf("Error deconfiguring node %s: %s", nodeName, err) } } -// Run creates initial node list and start syncing instances from cloudprovider, if any. -// It also starts syncing or monitoring cluster node status. -// 1. registerNodes() is called only once to register all initial nodes (from cloudprovider -// or from command line flag). To make cluster bootstrap faster, node controller populates -// node addresses. -// 2. syncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. -// Node created here will only have specs. -// 3. monitorNodeStatus() is called periodically to incorporate the results of node status -// pushed from kubelet to master. +// Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run(period time.Duration, syncNodeList bool) { - // Register intial set of nodes with their status set. - var nodes *api.NodeList - var err error - if nc.isRunningCloudProvider() { - if syncNodeList { - if nodes, err = nc.getCloudNodesWithSpec(); err != nil { - glog.Errorf("Error loading initial node from cloudprovider: %v", err) - } - } else { - nodes = &api.NodeList{} - } - } else { - if nodes, err = nc.getStaticNodesWithSpec(); err != nil { - glog.Errorf("Error loading initial static nodes: %v", err) - } - } - - if nodes, err = nc.populateAddresses(nodes); err != nil { - glog.Errorf("Error getting nodes ips: %v", err) - } - if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs { - nc.reconcilePodCIDRs(nodes, &api.NodeList{}) - } - if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil { - glog.Errorf("Error registering node list %+v: %v", nodes, err) - } - - // Start syncing node list from cloudprovider. - if syncNodeList && nc.isRunningCloudProvider() { - go util.Forever(func() { - if err := nc.syncCloudNodes(); err != nil { - glog.Errorf("Error syncing cloud: %v", err) - } - }, period) - } - - // Start monitoring node status. + // Incorporate the results of node status pushed from kubelet to master. go util.Forever(func() { if err := nc.monitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) @@ -255,165 +211,6 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) { }, nc.nodeMonitorPeriod) } -// registerNodes registers the given list of nodes, it keeps retrying for `retryCount` times. -func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { - if len(nodes.Items) == 0 { - return nil - } - nodes = nc.canonicalizeName(nodes) - toRegister := util.NewStringSet() - var wg sync.WaitGroup - var successfullyRegistered int32 = 0 - for i := range nodes.Items { - node := &nodes.Items[i] - if !toRegister.Has(node.Name) { - wg.Add(1) - toRegister.Insert(node.Name) - go func(n *api.Node) { - defer wg.Done() - for i := 0; i < retryCount; i++ { - if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs { - nc.configureNodeCIDR(n) - } - _, err := nc.kubeClient.Nodes().Create(n) - if err == nil || apierrors.IsAlreadyExists(err) { - glog.Infof("Registered node in registry: %v", n.Name) - atomic.AddInt32(&successfullyRegistered, 1) - return - } else { - glog.Errorf("Error registering node %v (retries left: %v): %v", n.Name, retryCount-i-1, err) - } - time.Sleep(retryInterval) - } - glog.Errorf("Unable to register node %v", n.Name) - }(node) - } - } - wg.Wait() - if int32(toRegister.Len()) != atomic.LoadInt32(&successfullyRegistered) { - return ErrRegistration - } else { - return nil - } -} - -// syncCloudNodes synchronizes the list of instances from cloudprovider to master server. -func (nc *NodeController) syncCloudNodes() error { - matches, err := nc.getCloudNodesWithSpec() - if err != nil { - return err - } - nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) - if err != nil { - return err - } - nodeMap := make(map[string]*api.Node) - nodeMapLock := sync.Mutex{} - for i := range nodes.Items { - node := nodes.Items[i] - nodeMapLock.Lock() - nodeMap[node.Name] = &node - nodeMapLock.Unlock() - } - if nc.allocateNodeCIDRs { - nc.reconcilePodCIDRs(matches, nodes) - } - var wg sync.WaitGroup - wg.Add(len(matches.Items)) - // Create nodes which have been created in cloud, but not in kubernetes cluster - // Skip nodes if we hit an error while trying to get their addresses. - for i := range matches.Items { - go func(node *api.Node) { - defer wg.Done() - nodeMapLock.Lock() - _, ok := nodeMap[node.Name] - nodeMapLock.Unlock() - if !ok { - glog.V(3).Infof("Querying addresses for new node: %s", node.Name) - nodeList := &api.NodeList{} - nodeList.Items = []api.Node{*node} - _, err = nc.populateAddresses(nodeList) - if err != nil { - glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err) - return - } - node.Status.Addresses = nodeList.Items[0].Status.Addresses - if nc.allocateNodeCIDRs { - nc.configureNodeCIDR(node) - } - glog.Infof("Create node in registry: %s", node.Name) - _, err = nc.kubeClient.Nodes().Create(node) - if err != nil { - glog.Errorf("Create node %s error: %v", node.Name, err) - } - } - nodeMapLock.Lock() - delete(nodeMap, node.Name) - nodeMapLock.Unlock() - }(&matches.Items[i]) - } - wg.Wait() - - wg.Add(len(nodeMap)) - // Delete nodes which have been deleted from cloud, but not from kubernetes cluster. - for nodeID := range nodeMap { - go func(nodeID string) { - defer wg.Done() - if nc.allocateNodeCIDRs { - nc.unassignNodeCIDR(nodeID) - } - glog.Infof("Delete node from registry: %s", nodeID) - err = nc.kubeClient.Nodes().Delete(nodeID) - if err != nil { - glog.Errorf("Delete node %s error: %v", nodeID, err) - } - nc.deletePods(nodeID) - }(nodeID) - } - wg.Wait() - - return nil -} - -// populateAddresses queries Address for given list of nodes. -func (nc *NodeController) populateAddresses(nodes *api.NodeList) (*api.NodeList, error) { - if nc.isRunningCloudProvider() { - instances, ok := nc.cloud.Instances() - if !ok { - return nodes, ErrCloudInstance - } - for i := range nodes.Items { - node := &nodes.Items[i] - nodeAddresses, err := instances.NodeAddresses(node.Name) - if err != nil { - glog.Errorf("error getting instance addresses for %s: %v", node.Name, err) - } else { - node.Status.Addresses = nodeAddresses - } - } - } else { - for i := range nodes.Items { - node := &nodes.Items[i] - addr := net.ParseIP(node.Name) - if addr != nil { - address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} - node.Status.Addresses = []api.NodeAddress{address} - } else { - addrs, err := nc.lookupIP(node.Name) - if err != nil { - glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) - } else if len(addrs) == 0 { - glog.Errorf("No ip address for node %v", node.Name) - } else { - address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()} - node.Status.Addresses = []api.NodeAddress{address} - } - } - } - } - return nodes, nil -} - func (nc *NodeController) recordNodeEvent(node *api.Node, event string) { ref := &api.ObjectReference{ Kind: "Node", @@ -567,6 +364,9 @@ func (nc *NodeController) monitorNodeStatus() error { if err != nil { return err } + if nc.allocateNodeCIDRs { + nc.reconcilePodCIDRs(nodes) + } for i := range nodes.Items { var gracePeriod time.Duration var lastReadyCondition api.NodeCondition @@ -595,10 +395,12 @@ func (nc *NodeController) monitorNodeStatus() error { if lastReadyCondition.Status == api.ConditionFalse && nc.now().After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. - // Makes sure we are not removing pods from to many nodes in the same time. + // Makes sure we are not removing pods from too many nodes in the same time. glog.Infof("Evicting pods: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) if nc.deletingPodsRateLimiter.CanAccept() { - nc.deletePods(node.Name) + if err := nc.deletePods(node.Name); err != nil { + glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) + } } } if lastReadyCondition.Status == api.ConditionUnknown && @@ -607,7 +409,9 @@ func (nc *NodeController) monitorNodeStatus() error { // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. glog.Infof("Evicting pods2: %v is later than %v + %v", nc.now(), nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) if nc.deletingPodsRateLimiter.CanAccept() { - nc.deletePods(node.Name) + if err := nc.deletePods(node.Name); err != nil { + glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) + } } } @@ -621,71 +425,33 @@ func (nc *NodeController) monitorNodeStatus() error { if readyCondition.Status == api.ConditionUnknown && lastReadyCondition.Status != api.ConditionUnknown { nc.recordNodeEvent(node, "unknown") } + + // Check with the cloud provider to see if the node still exists. If it + // doesn't, delete the node and all pods scheduled on the node. + if readyCondition.Status != api.ConditionTrue && nc.cloud != nil { + instances, ok := nc.cloud.Instances() + if !ok { + glog.Errorf("%v", ErrCloudInstance) + continue + } + if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound { + if nc.allocateNodeCIDRs { + nc.unassignNodeCIDR(node.Name) + } + if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil { + glog.Errorf("Unable to delete node %s: %v", node.Name, err) + continue + } + if err := nc.deletePods(node.Name); err != nil { + glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) + } + } + } } } return nil } -// getStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The method only -// constructs spec fields for nodes. -func (nc *NodeController) getStaticNodesWithSpec() (*api.NodeList, error) { - result := &api.NodeList{} - for _, nodeID := range nc.nodes { - node := api.Node{ - ObjectMeta: api.ObjectMeta{Name: nodeID}, - Spec: api.NodeSpec{ - ExternalID: nodeID, - }, - Status: api.NodeStatus{ - Capacity: nc.staticResources.Capacity, - }, - } - result.Items = append(result.Items, node) - } - return result, nil -} - -// getCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The method only -// constructs spec fields for nodes. -func (nc *NodeController) getCloudNodesWithSpec() (*api.NodeList, error) { - result := &api.NodeList{} - instances, ok := nc.cloud.Instances() - if !ok { - return result, ErrCloudInstance - } - matches, err := instances.List(nc.matchRE) - if err != nil { - return result, err - } - for i := range matches { - node := api.Node{} - node.Name = matches[i] - resources, err := instances.GetNodeResources(matches[i]) - if err != nil { - return nil, err - } - if resources == nil { - resources = nc.staticResources - } - if resources != nil { - node.Status.Capacity = resources.Capacity - if node.Status.Capacity != nil { - node.Status.Capacity[api.ResourcePods] = *resource.NewQuantity(0, resource.DecimalSI) - } - } - instanceID, err := instances.ExternalID(node.Name) - if err != nil { - glog.Errorf("Error getting instance id for %s: %v", node.Name, err) - } else { - node.Spec.ExternalID = instanceID - } - result.Items = append(result.Items, node) - } - return result, nil -} - // deletePods will delete all pods from master running on given node. func (nc *NodeController) deletePods(nodeID string) error { glog.V(2).Infof("Delete all pods from %v", nodeID) @@ -708,19 +474,6 @@ func (nc *NodeController) deletePods(nodeID string) error { return nil } -// isRunningCloudProvider checks if cluster is running with cloud provider. -func (nc *NodeController) isRunningCloudProvider() bool { - return nc.cloud != nil && len(nc.matchRE) > 0 -} - -// canonicalizeName takes a node list and lowercases all nodes' name. -func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { - for i := range nodes.Items { - nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name) - } - return nodes -} - // getCondition returns a condition object for the specific condition // type, nil if the condition is not set. func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition { diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go index 34e5b22fb54..d9ec66c84e2 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go @@ -19,7 +19,6 @@ package nodecontroller import ( "errors" "fmt" - "reflect" "sort" "sync" "testing" @@ -30,7 +29,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" - fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -142,506 +140,6 @@ func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, re return nil, nil } -func TestRegisterNodes(t *testing.T) { - table := []struct { - fakeNodeHandler *FakeNodeHandler - machines []string - retryCount int - expectedRequestCount int - expectedCreateCount int - expectedFail bool - }{ - { - // Register two nodes normally. - machines: []string{"node0", "node1"}, - fakeNodeHandler: &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true }, - }, - retryCount: 1, - expectedRequestCount: 2, - expectedCreateCount: 2, - expectedFail: false, - }, - { - // Canonicalize node names. - machines: []string{"NODE0", "node1"}, - fakeNodeHandler: &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { - if node.Name == "NODE0" { - return false - } - return true - }, - }, - retryCount: 1, - expectedRequestCount: 2, - expectedCreateCount: 2, - expectedFail: false, - }, - { - // No machine to register. - machines: []string{}, - fakeNodeHandler: &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { return true }, - }, - retryCount: 1, - expectedRequestCount: 0, - expectedCreateCount: 0, - expectedFail: false, - }, - { - // Fail the first two requests. - machines: []string{"node0", "node1"}, - fakeNodeHandler: &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { - if fake.RequestCount == 0 || fake.RequestCount == 1 { - return false - } - return true - }, - }, - retryCount: 10, - expectedRequestCount: 4, - expectedCreateCount: 2, - expectedFail: false, - }, - { - // One node already exists - machines: []string{"node0", "node1"}, - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node1", - }, - }, - }, - }, - retryCount: 10, - expectedRequestCount: 2, - expectedCreateCount: 1, - expectedFail: false, - }, - { - // The first node always fails. - machines: []string{"node0", "node1"}, - fakeNodeHandler: &FakeNodeHandler{ - CreateHook: func(fake *FakeNodeHandler, node *api.Node) bool { - if node.Name == "node0" { - return false - } - return true - }, - }, - retryCount: 2, - expectedRequestCount: 3, // 2 for node0, 1 for node1 - expectedCreateCount: 1, - expectedFail: true, - }, - } - - for _, item := range table { - nodes := api.NodeList{} - for _, machine := range item.machines { - nodes.Items = append(nodes.Items, *newNode(machine)) - } - nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) - err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond) - if !item.expectedFail && err != nil { - t.Errorf("unexpected error: %v", err) - } - if item.expectedFail && err == nil { - t.Errorf("unexpected non-error") - } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { - t.Errorf("expected %v calls, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } - if len(item.fakeNodeHandler.CreatedNodes) != item.expectedCreateCount { - t.Errorf("expected %v nodes, but got %v.", item.expectedCreateCount, item.fakeNodeHandler.CreatedNodes) - } - } -} - -func TestCreateGetStaticNodesWithSpec(t *testing.T) { - table := []struct { - machines []string - expectedNodes *api.NodeList - }{ - { - machines: []string{}, - expectedNodes: &api.NodeList{}, - }, - { - machines: []string{"node0"}, - expectedNodes: &api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Spec: api.NodeSpec{ - ExternalID: "node0", - }, - Status: api.NodeStatus{ - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceCPU): resource.MustParse("10"), - api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), - }, - }, - }, - }, - }, - }, - { - machines: []string{"node0", "node1"}, - expectedNodes: &api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Spec: api.NodeSpec{ - ExternalID: "node0", - }, - Status: api.NodeStatus{ - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceCPU): resource.MustParse("10"), - api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), - }, - }, - }, - { - ObjectMeta: api.ObjectMeta{Name: "node1"}, - Spec: api.NodeSpec{ - ExternalID: "node1", - }, - Status: api.NodeStatus{ - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceCPU): resource.MustParse("10"), - api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), - }, - }, - }, - }, - }, - }, - } - - resources := api.NodeResources{ - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceCPU): resource.MustParse("10"), - api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), - }, - } - for _, item := range table { - nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) - nodes, err := nodeController.getStaticNodesWithSpec() - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if !reflect.DeepEqual(item.expectedNodes, nodes) { - t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes) - } - } -} - -func TestCreateGetCloudNodesWithSpec(t *testing.T) { - resourceList := api.ResourceList{ - api.ResourceCPU: *resource.NewMilliQuantity(1000, resource.DecimalSI), - api.ResourceMemory: *resource.NewQuantity(3000, resource.DecimalSI), - } - - table := []struct { - fakeCloud *fake_cloud.FakeCloud - machines []string - expectedNodes *api.NodeList - }{ - { - fakeCloud: &fake_cloud.FakeCloud{}, - expectedNodes: &api.NodeList{}, - }, - { - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0"}, - NodeResources: &api.NodeResources{Capacity: resourceList}, - }, - expectedNodes: &api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Status: api.NodeStatus{Capacity: resourceList}, - }, - }, - }, - }, - { - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0", "node1"}, - NodeResources: &api.NodeResources{Capacity: resourceList}, - }, - expectedNodes: &api.NodeList{ - Items: []api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Status: api.NodeStatus{Capacity: resourceList}, - }, - { - ObjectMeta: api.ObjectMeta{Name: "node1"}, - Status: api.NodeStatus{Capacity: resourceList}, - }, - }, - }, - }, - } - - for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) - nodes, err := nodeController.getCloudNodesWithSpec() - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if !reflect.DeepEqual(item.expectedNodes, nodes) { - t.Errorf("expected node list %+v, got %+v", item.expectedNodes, nodes) - } - } -} - -func TestSyncCloudNodes(t *testing.T) { - table := []struct { - fakeNodeHandler *FakeNodeHandler - fakeCloud *fake_cloud.FakeCloud - matchRE string - expectedRequestCount int - expectedNameCreated []string - expectedExtIDCreated []string - expectedAddrsCreated []string - expectedDeleted []string - }{ - { - // 1 existing node, 1 cloud nodes: do nothing. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0")}, - }, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0"}, - ExtID: map[string]string{ - "node0": "ext-node0", - "node1": "ext-node1", - }, - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, - }, - matchRE: ".*", - expectedRequestCount: 1, // List - expectedNameCreated: []string{}, - expectedExtIDCreated: []string{}, - expectedAddrsCreated: []string{}, - expectedDeleted: []string{}, - }, - { - // 1 existing node, 2 cloud nodes: create 1. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0")}, - }, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0", "node1"}, - ExtID: map[string]string{ - "node0": "ext-node0", - "node1": "ext-node1", - }, - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, - }, - matchRE: ".*", - expectedRequestCount: 2, // List + Create - expectedNameCreated: []string{"node1"}, - expectedExtIDCreated: []string{"ext-node1"}, - expectedAddrsCreated: []string{"1.2.3.4"}, - expectedDeleted: []string{}, - }, - { - // 2 existing nodes, 1 cloud node: delete 1. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - }, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0"}, - ExtID: map[string]string{ - "node0": "ext-node0", - "node1": "ext-node1", - }, - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, - }, - matchRE: ".*", - expectedRequestCount: 2, // List + Delete - expectedNameCreated: []string{}, - expectedExtIDCreated: []string{}, - expectedAddrsCreated: []string{}, - expectedDeleted: []string{"node1"}, - }, - { - // 1 existing node, 3 cloud nodes but only 2 match regex: delete 1. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0")}, - }, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0", "node1", "fake"}, - ExtID: map[string]string{ - "node0": "ext-node0", - "node1": "ext-node1", - "fake": "ext-fake", - }, - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, - }, - matchRE: "node[0-9]+", - expectedRequestCount: 2, // List + Create - expectedNameCreated: []string{"node1"}, - expectedExtIDCreated: []string{"ext-node1"}, - expectedAddrsCreated: []string{"1.2.3.4"}, - expectedDeleted: []string{}, - }, - } - - for _, item := range table { - if item.fakeNodeHandler.Fake == nil { - item.fakeNodeHandler.Fake = testclient.NewSimpleFake() - } - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) - if err := nodeController.syncCloudNodes(); err != nil { - t.Errorf("unexpected error: %v", err) - } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } - nodes := sortedNodeNames(item.fakeNodeHandler.CreatedNodes) - if !reflect.DeepEqual(item.expectedNameCreated, nodes) { - t.Errorf("expected node list %+v, got %+v", item.expectedNameCreated, nodes) - } - nodeExtIDs := sortedNodeExternalIDs(item.fakeNodeHandler.CreatedNodes) - if !reflect.DeepEqual(item.expectedExtIDCreated, nodeExtIDs) { - t.Errorf("expected node external id list %+v, got %+v", item.expectedExtIDCreated, nodeExtIDs) - } - nodeAddrs := sortedNodeAddresses(item.fakeNodeHandler.CreatedNodes) - if !reflect.DeepEqual(item.expectedAddrsCreated, nodeAddrs) { - t.Errorf("expected node address list %+v, got %+v", item.expectedAddrsCreated, nodeAddrs) - } - nodes = sortedNodeNames(item.fakeNodeHandler.DeletedNodes) - if !reflect.DeepEqual(item.expectedDeleted, nodes) { - t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes) - } - } -} - -func TestSyncCloudNodesEvictPods(t *testing.T) { - table := []struct { - fakeNodeHandler *FakeNodeHandler - fakeCloud *fake_cloud.FakeCloud - matchRE string - expectedRequestCount int - expectedDeleted []string - expectedActions []testclient.FakeAction - }{ - { - // No node to delete: do nothing. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), - }, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0", "node1"}, - }, - matchRE: ".*", - expectedRequestCount: 1, // List - expectedDeleted: []string{}, - expectedActions: nil, - }, - { - // Delete node1, and pod0 is running on it. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node1")}}), - }, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0"}, - }, - matchRE: ".*", - expectedRequestCount: 2, // List + Delete - expectedDeleted: []string{"node1"}, - expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, - }, - { - // Delete node1, but pod0 is running on node0. - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}), - }, - fakeCloud: &fake_cloud.FakeCloud{ - Machines: []string{"node0"}, - }, - matchRE: ".*", - expectedRequestCount: 2, // List + Delete - expectedDeleted: []string{"node1"}, - expectedActions: []testclient.FakeAction{{Action: "list-pods"}}, - }, - } - - for _, item := range table { - if item.fakeNodeHandler.Fake == nil { - item.fakeNodeHandler.Fake = testclient.NewSimpleFake() - } - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) - if err := nodeController.syncCloudNodes(); err != nil { - t.Errorf("unexpected error: %v", err) - } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } - nodes := sortedNodeNames(item.fakeNodeHandler.DeletedNodes) - if !reflect.DeepEqual(item.expectedDeleted, nodes) { - t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes) - } - if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { - t.Errorf("time out waiting for deleting pods, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) - } - } -} - -func TestPopulateNodeAddresses(t *testing.T) { - table := []struct { - nodes *api.NodeList - fakeCloud *fake_cloud.FakeCloud - expectedFail bool - expectedAddresses []api.NodeAddress - }{ - { - nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}}, - fakeCloud: &fake_cloud.FakeCloud{Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}}, - expectedAddresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, - }, - }, - { - nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}}, - fakeCloud: &fake_cloud.FakeCloud{Err: ErrQueryIPAddress}, - expectedAddresses: nil, - }, - } - - for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) - result, err := nodeController.populateAddresses(item.nodes) - // In case of IP querying error, we should continue. - if err != nil { - t.Errorf("unexpected error: %v", err) - } - for _, node := range result.Items { - if !reflect.DeepEqual(item.expectedAddresses, node.Status.Addresses) { - t.Errorf("expect HostIP %s, got %s", item.expectedAddresses, node.Status.Addresses) - } - } - } -} - func TestMonitorNodeStatusEvictPods(t *testing.T) { fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) evictionTimeout := 10 * time.Minute @@ -826,7 +324,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, + nodeController := NewNodeController(nil, "", nil, item.fakeNodeHandler, 10, evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } @@ -1029,7 +527,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(), + nodeController := NewNodeController(nil, "", nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1320c531d18..e83ed8bfa12 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -31,6 +31,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -66,11 +67,6 @@ const ( // Max amount of time to wait for the container runtime to come up. maxWaitForContainerRuntime = 5 * time.Minute - // Initial node status update frequency and incremental frequency, for faster cluster startup. - // The update frequency will be increameted linearly, until it reaches status_update_frequency. - initialNodeStatusUpdateFrequency = 100 * time.Millisecond - nodeStatusUpdateFrequencyInc = 500 * time.Millisecond - // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5 @@ -122,6 +118,7 @@ func NewMainKubelet( pullBurst int, containerGCPolicy ContainerGCPolicy, sourcesReady SourcesReadyFn, + registerNode bool, clusterDomain string, clusterDNS net.IP, masterServiceNamespace string, @@ -224,6 +221,7 @@ func NewMainKubelet( readinessManager: readinessManager, httpClient: &http.Client{}, sourcesReady: sourcesReady, + registerNode: registerNode, clusterDomain: clusterDomain, clusterDNS: clusterDNS, serviceLister: serviceLister, @@ -373,6 +371,9 @@ type Kubelet struct { // cAdvisor used for container information. cadvisor cadvisor.Interface + // Set to true to have the node register itself with the apiserver. + registerNode bool + // If non-empty, use this for container DNS search. clusterDomain string @@ -657,26 +658,22 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { glog.Infof("Running in container %q", kl.resourceContainer) } - err := kl.imageManager.Start() - if err != nil { + if err := kl.imageManager.Start(); err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ImageManager %v", err) glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err) } - err = kl.cadvisor.Start() - if err != nil { + if err := kl.cadvisor.Start(); err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start CAdvisor %v", err) glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err) } - err = kl.containerManager.Start() - if err != nil { + if err := kl.containerManager.Start(); err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start ContainerManager %v", err) glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err) } - err = kl.oomWatcher.Start(kl.nodeRef) - if err != nil { + if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { kl.recorder.Eventf(kl.nodeRef, "kubeletSetupFailed", "Failed to start OOM watcher %v", err) glog.Errorf("Failed to start OOM watching: %v", err) } @@ -688,20 +685,83 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { kl.syncLoop(updates, kl) } +func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { + node := &api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: kl.hostname, + Labels: map[string]string{"kubernetes.io/hostname": kl.hostname}, + }, + } + if kl.cloud != nil { + instances, ok := kl.cloud.Instances() + if !ok { + return nil, fmt.Errorf("failed to get instances from cloud provider") + } + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + instanceID, err := instances.ExternalID(kl.hostname) + if err != nil { + return nil, fmt.Errorf("failed to get instance ID from cloud provider: %v", err) + } + node.Spec.ExternalID = instanceID + } else { + node.Spec.ExternalID = kl.hostname + } + if err := kl.setNodeStatus(node); err != nil { + return nil, err + } + return node, nil +} + +// registerWithApiserver registers the node with the cluster master. +func (kl *Kubelet) registerWithApiserver() { + step := 100 * time.Millisecond + for { + time.Sleep(step) + step = step * 2 + if step >= 7*time.Second { + step = 7 * time.Second + } + + node, err := kl.initialNodeStatus() + if err != nil { + glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) + continue + } + glog.V(2).Infof("Attempting to register node %s", node.Name) + if _, err := kl.kubeClient.Nodes().Create(node); err != nil { + if apierrors.IsAlreadyExists(err) { + currentNode, err := kl.kubeClient.Nodes().Get(kl.hostname) + if err != nil { + glog.Errorf("error getting node %q: %v", kl.hostname, err) + continue + } + if currentNode == nil { + glog.Errorf("no node instance returned for %q", kl.hostname) + continue + } + if currentNode.Spec.ExternalID == node.Spec.ExternalID { + glog.Infof("Node %s was previously registered", node.Name) + return + } + } + glog.V(2).Infof("Unable to register %s with the apiserver: %v", node.Name, err) + continue + } + glog.Infof("Successfully registered node %s", node.Name) + return + } +} + // syncNodeStatus periodically synchronizes node status to master. func (kl *Kubelet) syncNodeStatus() { if kl.kubeClient == nil { return } - glog.Infof("Starting node status updates") - for feq := initialNodeStatusUpdateFrequency; feq < kl.nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { - select { - case <-time.After(feq): - if err := kl.updateNodeStatus(); err != nil { - glog.Errorf("Unable to update node status: %v", err) - } - } + if kl.registerNode { + kl.registerWithApiserver() } + glog.Infof("Starting node status updates") for { select { case <-time.After(kl.nodeStatusUpdateFrequency): @@ -1687,14 +1747,13 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error { // updateNodeStatus updates node status to master with retries. func (kl *Kubelet) updateNodeStatus() error { for i := 0; i < nodeStatusUpdateRetry; i++ { - err := kl.tryUpdateNodeStatus() - if err != nil { - glog.Errorf("error updating node status, will retry: %v", err) + if err := kl.tryUpdateNodeStatus(); err != nil { + glog.Errorf("Error updating node status, will retry: %v", err) } else { return nil } } - return fmt.Errorf("Update node status exceeds retry count") + return fmt.Errorf("update node status exceeds retry count") } func (kl *Kubelet) recordNodeOnlineEvent() { @@ -1718,15 +1777,36 @@ func (kl *Kubelet) recordNodeUnschedulableEvent() { // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() var oldNodeUnschedulable bool -// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 -// is set, this function will also confirm that cbr0 is configured correctly. -func (kl *Kubelet) tryUpdateNodeStatus() error { - node, err := kl.kubeClient.Nodes().Get(kl.hostname) - if err != nil { - return fmt.Errorf("error getting node %q: %v", kl.hostname, err) - } - if node == nil { - return fmt.Errorf("no node instance returned for %q", kl.hostname) +// setNodeStatus fills in the Status fields of the given Node, overwriting +// any fields that are currently set. +func (kl *Kubelet) setNodeStatus(node *api.Node) error { + // Set addresses for the node. + if kl.cloud != nil { + instances, ok := kl.cloud.Instances() + if !ok { + return fmt.Errorf("failed to get instances from cloud provider") + } + // TODO(roberthbailey): Can we do this without having credentials to talk + // to the cloud provider? + nodeAddresses, err := instances.NodeAddresses(kl.hostname) + if err != nil { + return fmt.Errorf("failed to get node address from cloud provider: %v", err) + } + node.Status.Addresses = nodeAddresses + } else { + addr := net.ParseIP(kl.hostname) + if addr != nil { + node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addr.String()}} + } else { + addrs, err := net.LookupIP(node.Name) + if err != nil { + return fmt.Errorf("can't get ip address of node %s: %v", node.Name, err) + } else if len(addrs) == 0 { + return fmt.Errorf("no ip address for node %v", node.Name) + } else { + node.Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}} + } + } } networkConfigured := true @@ -1741,7 +1821,13 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. info, err := kl.GetCachedMachineInfo() if err != nil { - glog.Errorf("error getting machine info: %v", err) + // TODO(roberthbailey): This is required for test-cmd.sh to pass. + // See if the test should be updated instead. + node.Status.Capacity = api.ResourceList{ + api.ResourceCPU: *resource.NewMilliQuantity(0, resource.DecimalSI), + api.ResourceMemory: resource.MustParse("0Gi"), + } + glog.Errorf("Error getting machine info: %v", err) } else { node.Status.NodeInfo.MachineID = info.MachineID node.Status.NodeInfo.SystemUUID = info.SystemUUID @@ -1760,7 +1846,7 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { verinfo, err := kl.cadvisor.VersionInfo() if err != nil { - glog.Errorf("error getting version info: %v", err) + glog.Errorf("Error getting version info: %v", err) } else { node.Status.NodeInfo.KernelVersion = verinfo.KernelVersion node.Status.NodeInfo.OsImage = verinfo.ContainerOsVersion @@ -1828,7 +1914,22 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { } oldNodeUnschedulable = node.Spec.Unschedulable } + return nil +} +// tryUpdateNodeStatus tries to update node status to master. If ReconcileCBR0 +// is set, this function will also confirm that cbr0 is configured correctly. +func (kl *Kubelet) tryUpdateNodeStatus() error { + node, err := kl.kubeClient.Nodes().Get(kl.hostname) + if err != nil { + return fmt.Errorf("error getting node %q: %v", kl.hostname, err) + } + if node == nil { + return fmt.Errorf("no node instance returned for %q", kl.hostname) + } + if err := kl.setNodeStatus(node); err != nil { + return err + } // Update the current status on the API server _, err = kl.kubeClient.Nodes().UpdateStatus(node) return err diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index c7f0a733b8b..7e6713a95a0 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -34,6 +34,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" @@ -44,6 +45,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/network" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/version" @@ -3239,7 +3241,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { kubelet := testKubelet.kubelet kubeClient := testKubelet.fakeKubeClient kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: "testnode"}}, + {ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}}, }}).ReactFn machineInfo := &cadvisorApi.MachineInfo{ MachineID: "123", @@ -3257,7 +3259,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "testnode"}, + ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3284,6 +3286,7 @@ func TestUpdateNewNodeStatus(t *testing.T) { api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}}, }, } @@ -3317,7 +3320,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { kubeClient := testKubelet.fakeKubeClient kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ { - ObjectMeta: api.ObjectMeta{Name: "testnode"}, + ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3353,7 +3356,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { } mockCadvisor.On("VersionInfo").Return(versionInfo, nil) expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "testnode"}, + ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3380,6 +3383,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) { api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}}, }, } @@ -3419,7 +3423,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { fakeDocker.VersionInfo = []string{} kubeClient.ReactFn = testclient.NewSimpleFake(&api.NodeList{Items: []api.Node{ - {ObjectMeta: api.ObjectMeta{Name: "testnode"}}, + {ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}}, }}).ReactFn mockCadvisor := testKubelet.fakeCadvisor machineInfo := &cadvisorApi.MachineInfo{ @@ -3438,7 +3442,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { mockCadvisor.On("VersionInfo").Return(versionInfo, nil) expectedNode := &api.Node{ - ObjectMeta: api.ObjectMeta{Name: "testnode"}, + ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, Spec: api.NodeSpec{}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ @@ -3465,6 +3469,7 @@ func TestUpdateNodeStatusWithoutContainerRuntime(t *testing.T) { api.ResourceMemory: *resource.NewQuantity(1024, resource.BinarySI), api.ResourcePods: *resource.NewQuantity(0, resource.DecimalSI), }, + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.1"}}, }, } @@ -4402,6 +4407,62 @@ func TestFilterOutTerminatedPods(t *testing.T) { } } +func TestRegisterExistingNodeWithApiserver(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + kubelet.hostname = "127.0.0.1" + kubeClient := testKubelet.fakeKubeClient + kubeClient.ReactFn = func(action testclient.FakeAction) (runtime.Object, error) { + segments := strings.Split(action.Action, "-") + if len(segments) < 2 { + return nil, fmt.Errorf("unrecognized action, need two or three segments - or --: %s", action.Action) + } + verb := segments[0] + switch verb { + case "create": + // Return an error on create. + return &api.Node{}, &apierrors.StatusError{ + ErrStatus: api.Status{Reason: api.StatusReasonAlreadyExists}, + } + case "get": + // Return an existing (matching) node on get. + return &api.Node{ + ObjectMeta: api.ObjectMeta{Name: "127.0.0.1"}, + Spec: api.NodeSpec{ExternalID: "127.0.0.1"}, + }, nil + default: + return nil, fmt.Errorf("no reaction implemented for %s", action.Action) + } + } + machineInfo := &cadvisorApi.MachineInfo{ + MachineID: "123", + SystemUUID: "abc", + BootID: "1b3", + NumCores: 2, + MemoryCapacity: 1024, + } + mockCadvisor := testKubelet.fakeCadvisor + mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + versionInfo := &cadvisorApi.VersionInfo{ + KernelVersion: "3.16.0-0.bpo.4-amd64", + ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", + DockerVersion: "1.5.0", + } + mockCadvisor.On("VersionInfo").Return(versionInfo, nil) + + done := make(chan struct{}) + go func() { + kubelet.registerWithApiserver() + done <- struct{}{} + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("timed out waiting for registration") + case <-done: + return + } +} + func TestMakePortMappings(t *testing.T) { tests := []struct { container *api.Container