diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index 1466aa96e03..a6b37174932 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -37,48 +37,7 @@ MASTER_NAME="${INSTANCE_PREFIX}-master" MASTER_TAG="${INSTANCE_PREFIX}-master" MINION_TAG="${INSTANCE_PREFIX}-minion" MASTER_IP_RANGE="${MASTER_IP_RANGE:-10.246.0.0/24}" - -# Compute IP addresses for nodes. -function increment_ipv4 { - local ip_base=$1 - local incr_amount=$2 - local -a ip_components - local ip_regex="([0-9]+).([0-9]+).([0-9]+).([0-9]+)" - [[ $ip_base =~ $ip_regex ]] - ip_components=("${BASH_REMATCH[1]}" "${BASH_REMATCH[2]}" "${BASH_REMATCH[3]}" "${BASH_REMATCH[4]}") - ip_dec=0 - local comp - for comp in "${ip_components[@]}"; do - ip_dec=$((ip_dec<<8)) - ip_dec=$((ip_dec + $comp)) - done - - ip_dec=$((ip_dec + $incr_amount)) - - ip_components=() - local i - for ((i=0; i < 4; i++)); do - comp=$((ip_dec & 0xFF)) - ip_components+=($comp) - ip_dec=$((ip_dec>>8)) - done - echo "${ip_components[3]}.${ip_components[2]}.${ip_components[1]}.${ip_components[0]}" -} - -node_count="${NUM_MINIONS}" -next_node="${KUBE_GCE_CLUSTER_CLASS_B:-10.244}.0.0" -node_subnet_size=24 -node_subnet_count=$((2 ** (32-$node_subnet_size))) -subnets=() - -for ((node_num=0; node_num 5 )); then - echo -e "${color_red}Failed to create route $1 ${color_norm}" - exit 2 - fi - echo -e "${color_yellow}Attempt $(($attempt+1)) failed to create route $1. Retrying.${color_norm}" - attempt=$(($attempt+1)) - else - break - fi - done -} - # Robustly try to create an instance template. # $1: The name of the instance template. # $2: The scopes flag. @@ -569,23 +544,6 @@ function kube-up { # to gcloud's deficiency. wait-for-minions-to-run detect-minion-names - - # Create the routes and set IP ranges to instance metadata, 5 instances at a time. - for (( i=0; i<${#MINION_NAMES[@]}; i++)); do - create-route "${MINION_NAMES[$i]}" "${MINION_IP_RANGES[$i]}" & - add-instance-metadata "${MINION_NAMES[$i]}" "node-ip-range=${MINION_IP_RANGES[$i]}" & - - if [ $i -ne 0 ] && [ $((i%5)) -eq 0 ]; then - echo Waiting for a batch of routes at $i... - wait-for-jobs - fi - - done - create-route "${MASTER_NAME}" "${MASTER_IP_RANGE}" - - # Wait for last batch of jobs. - wait-for-jobs - detect-master echo "Waiting for cluster initialization." diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 9bb60ef287f..d3696343813 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -973,3 +973,11 @@ func (aws *AWSCloud) DeleteVolume(volumeName string) error { } return awsDisk.delete() } + +func (v *AWSCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (v *AWSCloud) Release(name string) error { + return nil +} diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 303644c39ae..4075fc2b35d 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -80,6 +80,10 @@ type Instances interface { List(filter string) ([]string, error) // GetNodeResources gets the resources for a particular node GetNodeResources(name string) (*api.NodeResources, error) + // Configure the specified instance using the spec + Configure(name string, spec *api.NodeSpec) error + // Delete all the configuration related to the instance, including other cloud resources + Release(name string) error } // Zone represents the location of a particular machine. diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index c87bee69c6f..cdabd2a0365 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -159,3 +159,13 @@ func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) { f.addCall("get-node-resources") return f.NodeResources, f.Err } + +func (f *FakeCloud) Configure(name string, spec *api.NodeSpec) error { + f.addCall("configure") + return f.Err +} + +func (f *FakeCloud) Release(name string) error { + f.addCall("release") + return f.Err +} diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index c595937c3ec..891674a621d 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -17,6 +17,7 @@ limitations under the License. package gce_cloud import ( + "errors" "fmt" "io" "io/ioutil" @@ -42,6 +43,9 @@ import ( "google.golang.org/cloud/compute/metadata" ) +var ErrMetadataConflict = errors.New("Metadata already set at the same key") +const podCIDRMetadataKey string = "node-ip-range" + // GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. type GCECloud struct { service *compute.Service @@ -49,6 +53,7 @@ type GCECloud struct { projectID string zone string instanceID string + networkName string // Used for accessing the metadata server metadataAccess func(string) (string, error) @@ -113,6 +118,18 @@ func getInstanceID() (string, error) { return parts[0], nil } +func getNetworkName() (string, error) { + result, err := metadata.Get("instance/network-interfaces/0/network") + if err != nil { + return "", err + } + parts := strings.Split(result, "/") + if len(parts) != 4 { + return "", fmt.Errorf("unexpected response: %s", result) + } + return parts[3], nil +} + // newGCECloud creates a new instance of GCECloud. func newGCECloud(config io.Reader) (*GCECloud, error) { projectID, zone, err := getProjectAndZone() @@ -126,6 +143,10 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { if err != nil { return nil, err } + networkName, err := getNetworkName() + if err != nil { + return nil, err + } tokenSource := google.ComputeTokenSource("") if config != nil { var cfg Config @@ -152,6 +173,7 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { projectID: projectID, zone: zone, instanceID: instanceID, + networkName: networkName, metadataAccess: getMetadata, }, nil } @@ -217,12 +239,12 @@ func (gce *GCECloud) targetPoolURL(name, region string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } -func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { +func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, error)) error { pollOp := op for pollOp.Status != "DONE" { var err error time.Sleep(time.Second) - pollOp, err = gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do() + pollOp, err = getOperation() if err != nil { return err } @@ -234,6 +256,25 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error } } return nil + +} + +func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error { + return waitForOp(op, func() (*compute.Operation, error) { + return gce.service.GlobalOperations.Get(gce.projectID, op.Name).Do() + }) +} + +func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { + return waitForOp(op, func() (*compute.Operation, error) { + return gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do() + }) +} + +func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error { + return waitForOp(op, func() (*compute.Operation, error) { + return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, op.Name).Do() + }) } // GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer @@ -506,6 +547,66 @@ func (gce *GCECloud) GetNodeResources(name string) (*api.NodeResources, error) { } } +func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) { + for _, item := range metadata.Items { + if item.Key == key { + return item.Value, true + } + } + return "", false +} + +func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error { + instanceName := canonicalizeInstanceName(name) + instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceName).Do() + if err != nil { + return err + } + if currentValue, ok := getMetadataValue(instance.Metadata, podCIDRMetadataKey); ok { + if currentValue == spec.PodCIDR { + // IP range already set to proper value. + return nil + } + return ErrMetadataConflict + } + instance.Metadata.Items = append(instance.Metadata.Items, + &compute.MetadataItems{ + Key: podCIDRMetadataKey, + Value: spec.PodCIDR, + }) + setMetadataCall := gce.service.Instances.SetMetadata(gce.projectID, gce.zone, instanceName, instance.Metadata) + setMetadataOp, err := setMetadataCall.Do() + if err != nil { + return err + } + err = gce.waitForZoneOp(setMetadataOp) + if err != nil { + return err + } + insertCall := gce.service.Routes.Insert(gce.projectID, &compute.Route{ + Name: instanceName, + DestRange: spec.PodCIDR, + NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", gce.zone, instanceName), + Network: fmt.Sprintf("global/networks/%s", gce.networkName), + Priority: 1000, + }) + insertOp, err := insertCall.Do() + if err != nil { + return err + } + return gce.waitForGlobalOp(insertOp) +} + +func (gce *GCECloud) Release(name string) error { + instanceName := canonicalizeInstanceName(name) + deleteCall := gce.service.Routes.Delete(gce.projectID, instanceName) + deleteOp, err := deleteCall.Do() + if err != nil { + return err + } + return gce.waitForGlobalOp(deleteOp) +} + func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) { region, err := getGceRegion(gce.zone) if err != nil { diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index 8f6658b3fbe..ec13fb873ea 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -134,6 +135,61 @@ func NewNodeController( } } +// Generates num pod CIDRs that could be assigned to nodes. +func generateCIDRs(num int) util.StringSet { + res := util.NewStringSet() + for i := 0; i < num; i++ { + // TODO: Make the CIDRs configurable. + res.Insert(fmt.Sprintf("10.244.%v.0/24", i)) + } + return res +} + +// For each node from newNodes, finds its current spec in registeredNodes. +// If it is not there, it gets a new valid CIDR assigned. +func reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList { + registeredCIDRs := make(map[string]string) + availableCIDRs := generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items)) + for _, node := range registeredNodes.Items { + registeredCIDRs[node.Name] = node.Spec.PodCIDR + availableCIDRs.Delete(node.Spec.PodCIDR) + } + for i, node := range newNodes.Items { + podCIDR, registered := registeredCIDRs[node.Name] + if !registered { + podCIDR, _ = availableCIDRs.PopAny() + } + newNodes.Items[i].Spec.PodCIDR = podCIDR + } + return newNodes +} + +func (nc *NodeController) configureNodeCIDR(node *api.Node) { + instances, ok := nc.cloud.Instances() + if !ok { + glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name) + return + } + err := instances.Configure(node.Name, &node.Spec) + if err != nil { + glog.Errorf("Error configuring node %s: %s", node.Name, err) + // The newly assigned CIDR was not properly configured, so don't save it in the API server. + node.Spec.PodCIDR = "" + } +} + +func (nc *NodeController) unassignNodeCIDR(nodeName string) { + instances, ok := nc.cloud.Instances() + if !ok { + glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName) + return + } + err := instances.Release(nodeName) + if err != nil { + glog.Errorf("Error deconfiguring node %s: %s", nodeName, err) + } +} + // Run creates initial node list and start syncing instances from cloudprovider, if any. // It also starts syncing or monitoring cluster node status. // 1. registerNodes() is called only once to register all initial nodes (from cloudprovider @@ -164,6 +220,9 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) { if nodes, err = nc.populateAddresses(nodes); err != nil { glog.Errorf("Error getting nodes ips: %v", err) } + if nc.isRunningCloudProvider() { + reconcilePodCIDRs(nodes, &api.NodeList{}) + } if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil { glog.Errorf("Error registering node list %+v: %v", nodes, err) } @@ -194,21 +253,30 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret registered := util.NewStringSet() nodes = nc.canonicalizeName(nodes) for i := 0; i < retryCount; i++ { - for _, node := range nodes.Items { - if registered.Has(node.Name) { - continue - } - _, err := nc.kubeClient.Nodes().Create(&node) - if err == nil || apierrors.IsAlreadyExists(err) { - registered.Insert(node.Name) - glog.Infof("Registered node in registry: %s", node.Name) - } else { - glog.Errorf("Error registering node %s, retrying: %s", node.Name, err) - } - if registered.Len() == len(nodes.Items) { - glog.Infof("Successfully registered all nodes") - return nil - } + var wg sync.WaitGroup + wg.Add(len(nodes.Items)) + for i := range nodes.Items { + go func(node *api.Node) { + defer wg.Done() + if registered.Has(node.Name) { + return + } + if nc.isRunningCloudProvider() { + nc.configureNodeCIDR(node) + } + _, err := nc.kubeClient.Nodes().Create(node) + if err == nil || apierrors.IsAlreadyExists(err) { + registered.Insert(node.Name) + glog.Infof("Registered node in registry: %s", node.Name) + } else { + glog.Errorf("Error registering node %s, retrying: %s", node.Name, err) + } + }(&nodes.Items[i]) + } + wg.Wait() + if registered.Len() == len(nodes.Items) { + glog.Infof("Successfully registered all nodes") + return nil } time.Sleep(retryInterval) } @@ -234,39 +302,51 @@ func (nc *NodeController) syncCloudNodes() error { node := nodes.Items[i] nodeMap[node.Name] = &node } - + reconcilePodCIDRs(matches, nodes) + var wg sync.WaitGroup + wg.Add(len(matches.Items)) // Create nodes which have been created in cloud, but not in kubernetes cluster // Skip nodes if we hit an error while trying to get their addresses. - for _, node := range matches.Items { - if _, ok := nodeMap[node.Name]; !ok { - glog.V(3).Infof("Querying addresses for new node: %s", node.Name) - nodeList := &api.NodeList{} - nodeList.Items = []api.Node{node} - _, err = nc.populateAddresses(nodeList) - if err != nil { - glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err) - continue + for i := range matches.Items { + go func(node *api.Node) { + defer wg.Done() + if _, ok := nodeMap[node.Name]; !ok { + glog.V(3).Infof("Querying addresses for new node: %s", node.Name) + nodeList := &api.NodeList{} + nodeList.Items = []api.Node{*node} + _, err = nc.populateAddresses(nodeList) + if err != nil { + glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err) + return + } + node.Status.Addresses = nodeList.Items[0].Status.Addresses + nc.configureNodeCIDR(node) + glog.Infof("Create node in registry: %s", node.Name) + _, err = nc.kubeClient.Nodes().Create(node) + if err != nil { + glog.Errorf("Create node %s error: %v", node.Name, err) + } } - node.Status.Addresses = nodeList.Items[0].Status.Addresses - - glog.Infof("Create node in registry: %s", node.Name) - _, err = nc.kubeClient.Nodes().Create(&node) - if err != nil { - glog.Errorf("Create node %s error: %v", node.Name, err) - } - } - delete(nodeMap, node.Name) + delete(nodeMap, node.Name) + }(&matches.Items[i]) } + wg.Wait() + wg.Add(len(nodeMap)) // Delete nodes which have been deleted from cloud, but not from kubernetes cluster. for nodeID := range nodeMap { - glog.Infof("Delete node from registry: %s", nodeID) - err = nc.kubeClient.Nodes().Delete(nodeID) - if err != nil { - glog.Errorf("Delete node %s error: %v", nodeID, err) - } - nc.deletePods(nodeID) + go func(nodeID string) { + defer wg.Done() + nc.unassignNodeCIDR(nodeID) + glog.Infof("Delete node from registry: %s", nodeID) + err = nc.kubeClient.Nodes().Delete(nodeID) + if err != nil { + glog.Errorf("Delete node %s error: %v", nodeID, err) + } + nc.deletePods(nodeID) + }(nodeID) } + wg.Wait() return nil } diff --git a/pkg/cloudprovider/openstack/openstack.go b/pkg/cloudprovider/openstack/openstack.go index 139618370d2..f4a8aa0ef8c 100644 --- a/pkg/cloudprovider/openstack/openstack.go +++ b/pkg/cloudprovider/openstack/openstack.go @@ -389,6 +389,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { return rsrc, nil } +func (i *Instances) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (i *Instances) Release(name string) error { + return nil +} + func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) { return nil, false } diff --git a/pkg/cloudprovider/ovirt/ovirt.go b/pkg/cloudprovider/ovirt/ovirt.go index 0f566e1b6c6..68156c5252e 100644 --- a/pkg/cloudprovider/ovirt/ovirt.go +++ b/pkg/cloudprovider/ovirt/ovirt.go @@ -250,3 +250,11 @@ func (v *OVirtCloud) List(filter string) ([]string, error) { func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, nil } + +func (v *OVirtCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (v *OVirtCloud) Release(name string) error { + return nil +} diff --git a/pkg/cloudprovider/rackspace/rackspace.go b/pkg/cloudprovider/rackspace/rackspace.go index c3682e85880..8bc8f5c6338 100644 --- a/pkg/cloudprovider/rackspace/rackspace.go +++ b/pkg/cloudprovider/rackspace/rackspace.go @@ -395,6 +395,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { return rsrc, nil } +func (i *Instances) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (i *Instances) Release(name string) error { + return nil +} + func (os *Rackspace) Clusters() (cloudprovider.Clusters, bool) { return nil, false } diff --git a/pkg/cloudprovider/vagrant/vagrant.go b/pkg/cloudprovider/vagrant/vagrant.go index 4a35d990902..fa45180ac9b 100644 --- a/pkg/cloudprovider/vagrant/vagrant.go +++ b/pkg/cloudprovider/vagrant/vagrant.go @@ -239,3 +239,11 @@ func (v *VagrantCloud) List(filter string) ([]string, error) { func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, nil } + +func (v *VagrantCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (v *VagrantCloud) Release(name string) error { + return nil +} diff --git a/pkg/util/set.go b/pkg/util/set.go index 5deb4ee3ce9..1b3f9d6d839 100644 --- a/pkg/util/set.go +++ b/pkg/util/set.go @@ -139,6 +139,15 @@ func (s StringSet) List() []string { return res } +// Returns a single element from the set. +func (s StringSet) PopAny() (string, bool) { + for key := range s { + s.Delete(key) + return key, true + } + return "", false +} + // Len returns the size of the set. func (s StringSet) Len() int { return len(s)