From 290c7b94ef319213df3280b2f82b1d45ad587565 Mon Sep 17 00:00:00 2001 From: Tomek Kulczynski Date: Tue, 24 Feb 2015 13:32:44 +0100 Subject: [PATCH 1/3] Make nodecontroller configure nodes' pod IP ranges --- cluster/gce/config-default.sh | 41 ----- cluster/gce/config-test.sh | 1 - cluster/gce/util.sh | 42 ----- pkg/cloudprovider/aws/aws.go | 8 + pkg/cloudprovider/cloud.go | 4 + pkg/cloudprovider/fake/fake.go | 10 ++ pkg/cloudprovider/gce/gce.go | 105 +++++++++++- .../nodecontroller/nodecontroller.go | 160 +++++++++++++----- pkg/cloudprovider/openstack/openstack.go | 8 + pkg/cloudprovider/ovirt/ovirt.go | 8 + pkg/cloudprovider/rackspace/rackspace.go | 8 + pkg/cloudprovider/vagrant/vagrant.go | 8 + pkg/util/set.go | 9 + 13 files changed, 286 insertions(+), 126 deletions(-) diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index 1466aa96e03..a6b37174932 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -37,48 +37,7 @@ MASTER_NAME="${INSTANCE_PREFIX}-master" MASTER_TAG="${INSTANCE_PREFIX}-master" MINION_TAG="${INSTANCE_PREFIX}-minion" MASTER_IP_RANGE="${MASTER_IP_RANGE:-10.246.0.0/24}" - -# Compute IP addresses for nodes. -function increment_ipv4 { - local ip_base=$1 - local incr_amount=$2 - local -a ip_components - local ip_regex="([0-9]+).([0-9]+).([0-9]+).([0-9]+)" - [[ $ip_base =~ $ip_regex ]] - ip_components=("${BASH_REMATCH[1]}" "${BASH_REMATCH[2]}" "${BASH_REMATCH[3]}" "${BASH_REMATCH[4]}") - ip_dec=0 - local comp - for comp in "${ip_components[@]}"; do - ip_dec=$((ip_dec<<8)) - ip_dec=$((ip_dec + $comp)) - done - - ip_dec=$((ip_dec + $incr_amount)) - - ip_components=() - local i - for ((i=0; i < 4; i++)); do - comp=$((ip_dec & 0xFF)) - ip_components+=($comp) - ip_dec=$((ip_dec>>8)) - done - echo "${ip_components[3]}.${ip_components[2]}.${ip_components[1]}.${ip_components[0]}" -} - -node_count="${NUM_MINIONS}" -next_node="${KUBE_GCE_CLUSTER_CLASS_B:-10.244}.0.0" -node_subnet_size=24 -node_subnet_count=$((2 ** (32-$node_subnet_size))) -subnets=() - -for ((node_num=0; node_num 5 )); then - echo -e "${color_red}Failed to create route $1 ${color_norm}" - exit 2 - fi - echo -e "${color_yellow}Attempt $(($attempt+1)) failed to create route $1. Retrying.${color_norm}" - attempt=$(($attempt+1)) - else - break - fi - done -} - # Robustly try to create an instance template. # $1: The name of the instance template. # $2: The scopes flag. @@ -569,23 +544,6 @@ function kube-up { # to gcloud's deficiency. wait-for-minions-to-run detect-minion-names - - # Create the routes and set IP ranges to instance metadata, 5 instances at a time. - for (( i=0; i<${#MINION_NAMES[@]}; i++)); do - create-route "${MINION_NAMES[$i]}" "${MINION_IP_RANGES[$i]}" & - add-instance-metadata "${MINION_NAMES[$i]}" "node-ip-range=${MINION_IP_RANGES[$i]}" & - - if [ $i -ne 0 ] && [ $((i%5)) -eq 0 ]; then - echo Waiting for a batch of routes at $i... - wait-for-jobs - fi - - done - create-route "${MASTER_NAME}" "${MASTER_IP_RANGE}" - - # Wait for last batch of jobs. - wait-for-jobs - detect-master echo "Waiting for cluster initialization." diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 9bb60ef287f..d3696343813 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -973,3 +973,11 @@ func (aws *AWSCloud) DeleteVolume(volumeName string) error { } return awsDisk.delete() } + +func (v *AWSCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (v *AWSCloud) Release(name string) error { + return nil +} diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 303644c39ae..4075fc2b35d 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -80,6 +80,10 @@ type Instances interface { List(filter string) ([]string, error) // GetNodeResources gets the resources for a particular node GetNodeResources(name string) (*api.NodeResources, error) + // Configure the specified instance using the spec + Configure(name string, spec *api.NodeSpec) error + // Delete all the configuration related to the instance, including other cloud resources + Release(name string) error } // Zone represents the location of a particular machine. diff --git a/pkg/cloudprovider/fake/fake.go b/pkg/cloudprovider/fake/fake.go index c87bee69c6f..cdabd2a0365 100644 --- a/pkg/cloudprovider/fake/fake.go +++ b/pkg/cloudprovider/fake/fake.go @@ -159,3 +159,13 @@ func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) { f.addCall("get-node-resources") return f.NodeResources, f.Err } + +func (f *FakeCloud) Configure(name string, spec *api.NodeSpec) error { + f.addCall("configure") + return f.Err +} + +func (f *FakeCloud) Release(name string) error { + f.addCall("release") + return f.Err +} diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index c595937c3ec..891674a621d 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -17,6 +17,7 @@ limitations under the License. package gce_cloud import ( + "errors" "fmt" "io" "io/ioutil" @@ -42,6 +43,9 @@ import ( "google.golang.org/cloud/compute/metadata" ) +var ErrMetadataConflict = errors.New("Metadata already set at the same key") +const podCIDRMetadataKey string = "node-ip-range" + // GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. type GCECloud struct { service *compute.Service @@ -49,6 +53,7 @@ type GCECloud struct { projectID string zone string instanceID string + networkName string // Used for accessing the metadata server metadataAccess func(string) (string, error) @@ -113,6 +118,18 @@ func getInstanceID() (string, error) { return parts[0], nil } +func getNetworkName() (string, error) { + result, err := metadata.Get("instance/network-interfaces/0/network") + if err != nil { + return "", err + } + parts := strings.Split(result, "/") + if len(parts) != 4 { + return "", fmt.Errorf("unexpected response: %s", result) + } + return parts[3], nil +} + // newGCECloud creates a new instance of GCECloud. func newGCECloud(config io.Reader) (*GCECloud, error) { projectID, zone, err := getProjectAndZone() @@ -126,6 +143,10 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { if err != nil { return nil, err } + networkName, err := getNetworkName() + if err != nil { + return nil, err + } tokenSource := google.ComputeTokenSource("") if config != nil { var cfg Config @@ -152,6 +173,7 @@ func newGCECloud(config io.Reader) (*GCECloud, error) { projectID: projectID, zone: zone, instanceID: instanceID, + networkName: networkName, metadataAccess: getMetadata, }, nil } @@ -217,12 +239,12 @@ func (gce *GCECloud) targetPoolURL(name, region string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } -func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { +func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, error)) error { pollOp := op for pollOp.Status != "DONE" { var err error time.Sleep(time.Second) - pollOp, err = gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do() + pollOp, err = getOperation() if err != nil { return err } @@ -234,6 +256,25 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error } } return nil + +} + +func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error { + return waitForOp(op, func() (*compute.Operation, error) { + return gce.service.GlobalOperations.Get(gce.projectID, op.Name).Do() + }) +} + +func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error { + return waitForOp(op, func() (*compute.Operation, error) { + return gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do() + }) +} + +func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error { + return waitForOp(op, func() (*compute.Operation, error) { + return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, op.Name).Do() + }) } // GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer @@ -506,6 +547,66 @@ func (gce *GCECloud) GetNodeResources(name string) (*api.NodeResources, error) { } } +func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) { + for _, item := range metadata.Items { + if item.Key == key { + return item.Value, true + } + } + return "", false +} + +func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error { + instanceName := canonicalizeInstanceName(name) + instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceName).Do() + if err != nil { + return err + } + if currentValue, ok := getMetadataValue(instance.Metadata, podCIDRMetadataKey); ok { + if currentValue == spec.PodCIDR { + // IP range already set to proper value. + return nil + } + return ErrMetadataConflict + } + instance.Metadata.Items = append(instance.Metadata.Items, + &compute.MetadataItems{ + Key: podCIDRMetadataKey, + Value: spec.PodCIDR, + }) + setMetadataCall := gce.service.Instances.SetMetadata(gce.projectID, gce.zone, instanceName, instance.Metadata) + setMetadataOp, err := setMetadataCall.Do() + if err != nil { + return err + } + err = gce.waitForZoneOp(setMetadataOp) + if err != nil { + return err + } + insertCall := gce.service.Routes.Insert(gce.projectID, &compute.Route{ + Name: instanceName, + DestRange: spec.PodCIDR, + NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", gce.zone, instanceName), + Network: fmt.Sprintf("global/networks/%s", gce.networkName), + Priority: 1000, + }) + insertOp, err := insertCall.Do() + if err != nil { + return err + } + return gce.waitForGlobalOp(insertOp) +} + +func (gce *GCECloud) Release(name string) error { + instanceName := canonicalizeInstanceName(name) + deleteCall := gce.service.Routes.Delete(gce.projectID, instanceName) + deleteOp, err := deleteCall.Do() + if err != nil { + return err + } + return gce.waitForGlobalOp(deleteOp) +} + func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) { region, err := getGceRegion(gce.zone) if err != nil { diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index 8f6658b3fbe..ec13fb873ea 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "strings" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -134,6 +135,61 @@ func NewNodeController( } } +// Generates num pod CIDRs that could be assigned to nodes. +func generateCIDRs(num int) util.StringSet { + res := util.NewStringSet() + for i := 0; i < num; i++ { + // TODO: Make the CIDRs configurable. + res.Insert(fmt.Sprintf("10.244.%v.0/24", i)) + } + return res +} + +// For each node from newNodes, finds its current spec in registeredNodes. +// If it is not there, it gets a new valid CIDR assigned. +func reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList { + registeredCIDRs := make(map[string]string) + availableCIDRs := generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items)) + for _, node := range registeredNodes.Items { + registeredCIDRs[node.Name] = node.Spec.PodCIDR + availableCIDRs.Delete(node.Spec.PodCIDR) + } + for i, node := range newNodes.Items { + podCIDR, registered := registeredCIDRs[node.Name] + if !registered { + podCIDR, _ = availableCIDRs.PopAny() + } + newNodes.Items[i].Spec.PodCIDR = podCIDR + } + return newNodes +} + +func (nc *NodeController) configureNodeCIDR(node *api.Node) { + instances, ok := nc.cloud.Instances() + if !ok { + glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name) + return + } + err := instances.Configure(node.Name, &node.Spec) + if err != nil { + glog.Errorf("Error configuring node %s: %s", node.Name, err) + // The newly assigned CIDR was not properly configured, so don't save it in the API server. + node.Spec.PodCIDR = "" + } +} + +func (nc *NodeController) unassignNodeCIDR(nodeName string) { + instances, ok := nc.cloud.Instances() + if !ok { + glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName) + return + } + err := instances.Release(nodeName) + if err != nil { + glog.Errorf("Error deconfiguring node %s: %s", nodeName, err) + } +} + // Run creates initial node list and start syncing instances from cloudprovider, if any. // It also starts syncing or monitoring cluster node status. // 1. registerNodes() is called only once to register all initial nodes (from cloudprovider @@ -164,6 +220,9 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) { if nodes, err = nc.populateAddresses(nodes); err != nil { glog.Errorf("Error getting nodes ips: %v", err) } + if nc.isRunningCloudProvider() { + reconcilePodCIDRs(nodes, &api.NodeList{}) + } if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil { glog.Errorf("Error registering node list %+v: %v", nodes, err) } @@ -194,21 +253,30 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret registered := util.NewStringSet() nodes = nc.canonicalizeName(nodes) for i := 0; i < retryCount; i++ { - for _, node := range nodes.Items { - if registered.Has(node.Name) { - continue - } - _, err := nc.kubeClient.Nodes().Create(&node) - if err == nil || apierrors.IsAlreadyExists(err) { - registered.Insert(node.Name) - glog.Infof("Registered node in registry: %s", node.Name) - } else { - glog.Errorf("Error registering node %s, retrying: %s", node.Name, err) - } - if registered.Len() == len(nodes.Items) { - glog.Infof("Successfully registered all nodes") - return nil - } + var wg sync.WaitGroup + wg.Add(len(nodes.Items)) + for i := range nodes.Items { + go func(node *api.Node) { + defer wg.Done() + if registered.Has(node.Name) { + return + } + if nc.isRunningCloudProvider() { + nc.configureNodeCIDR(node) + } + _, err := nc.kubeClient.Nodes().Create(node) + if err == nil || apierrors.IsAlreadyExists(err) { + registered.Insert(node.Name) + glog.Infof("Registered node in registry: %s", node.Name) + } else { + glog.Errorf("Error registering node %s, retrying: %s", node.Name, err) + } + }(&nodes.Items[i]) + } + wg.Wait() + if registered.Len() == len(nodes.Items) { + glog.Infof("Successfully registered all nodes") + return nil } time.Sleep(retryInterval) } @@ -234,39 +302,51 @@ func (nc *NodeController) syncCloudNodes() error { node := nodes.Items[i] nodeMap[node.Name] = &node } - + reconcilePodCIDRs(matches, nodes) + var wg sync.WaitGroup + wg.Add(len(matches.Items)) // Create nodes which have been created in cloud, but not in kubernetes cluster // Skip nodes if we hit an error while trying to get their addresses. - for _, node := range matches.Items { - if _, ok := nodeMap[node.Name]; !ok { - glog.V(3).Infof("Querying addresses for new node: %s", node.Name) - nodeList := &api.NodeList{} - nodeList.Items = []api.Node{node} - _, err = nc.populateAddresses(nodeList) - if err != nil { - glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err) - continue + for i := range matches.Items { + go func(node *api.Node) { + defer wg.Done() + if _, ok := nodeMap[node.Name]; !ok { + glog.V(3).Infof("Querying addresses for new node: %s", node.Name) + nodeList := &api.NodeList{} + nodeList.Items = []api.Node{*node} + _, err = nc.populateAddresses(nodeList) + if err != nil { + glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err) + return + } + node.Status.Addresses = nodeList.Items[0].Status.Addresses + nc.configureNodeCIDR(node) + glog.Infof("Create node in registry: %s", node.Name) + _, err = nc.kubeClient.Nodes().Create(node) + if err != nil { + glog.Errorf("Create node %s error: %v", node.Name, err) + } } - node.Status.Addresses = nodeList.Items[0].Status.Addresses - - glog.Infof("Create node in registry: %s", node.Name) - _, err = nc.kubeClient.Nodes().Create(&node) - if err != nil { - glog.Errorf("Create node %s error: %v", node.Name, err) - } - } - delete(nodeMap, node.Name) + delete(nodeMap, node.Name) + }(&matches.Items[i]) } + wg.Wait() + wg.Add(len(nodeMap)) // Delete nodes which have been deleted from cloud, but not from kubernetes cluster. for nodeID := range nodeMap { - glog.Infof("Delete node from registry: %s", nodeID) - err = nc.kubeClient.Nodes().Delete(nodeID) - if err != nil { - glog.Errorf("Delete node %s error: %v", nodeID, err) - } - nc.deletePods(nodeID) + go func(nodeID string) { + defer wg.Done() + nc.unassignNodeCIDR(nodeID) + glog.Infof("Delete node from registry: %s", nodeID) + err = nc.kubeClient.Nodes().Delete(nodeID) + if err != nil { + glog.Errorf("Delete node %s error: %v", nodeID, err) + } + nc.deletePods(nodeID) + }(nodeID) } + wg.Wait() return nil } diff --git a/pkg/cloudprovider/openstack/openstack.go b/pkg/cloudprovider/openstack/openstack.go index 139618370d2..f4a8aa0ef8c 100644 --- a/pkg/cloudprovider/openstack/openstack.go +++ b/pkg/cloudprovider/openstack/openstack.go @@ -389,6 +389,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { return rsrc, nil } +func (i *Instances) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (i *Instances) Release(name string) error { + return nil +} + func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) { return nil, false } diff --git a/pkg/cloudprovider/ovirt/ovirt.go b/pkg/cloudprovider/ovirt/ovirt.go index 0f566e1b6c6..68156c5252e 100644 --- a/pkg/cloudprovider/ovirt/ovirt.go +++ b/pkg/cloudprovider/ovirt/ovirt.go @@ -250,3 +250,11 @@ func (v *OVirtCloud) List(filter string) ([]string, error) { func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, nil } + +func (v *OVirtCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (v *OVirtCloud) Release(name string) error { + return nil +} diff --git a/pkg/cloudprovider/rackspace/rackspace.go b/pkg/cloudprovider/rackspace/rackspace.go index c3682e85880..8bc8f5c6338 100644 --- a/pkg/cloudprovider/rackspace/rackspace.go +++ b/pkg/cloudprovider/rackspace/rackspace.go @@ -395,6 +395,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { return rsrc, nil } +func (i *Instances) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (i *Instances) Release(name string) error { + return nil +} + func (os *Rackspace) Clusters() (cloudprovider.Clusters, bool) { return nil, false } diff --git a/pkg/cloudprovider/vagrant/vagrant.go b/pkg/cloudprovider/vagrant/vagrant.go index 4a35d990902..fa45180ac9b 100644 --- a/pkg/cloudprovider/vagrant/vagrant.go +++ b/pkg/cloudprovider/vagrant/vagrant.go @@ -239,3 +239,11 @@ func (v *VagrantCloud) List(filter string) ([]string, error) { func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, nil } + +func (v *VagrantCloud) Configure(name string, spec *api.NodeSpec) error { + return nil +} + +func (v *VagrantCloud) Release(name string) error { + return nil +} diff --git a/pkg/util/set.go b/pkg/util/set.go index 5deb4ee3ce9..1b3f9d6d839 100644 --- a/pkg/util/set.go +++ b/pkg/util/set.go @@ -139,6 +139,15 @@ func (s StringSet) List() []string { return res } +// Returns a single element from the set. +func (s StringSet) PopAny() (string, bool) { + for key := range s { + s.Delete(key) + return key, true + } + return "", false +} + // Len returns the size of the set. func (s StringSet) Len() int { return len(s) From e967ffd5225ad4bfd91507156c505c591184447d Mon Sep 17 00:00:00 2001 From: Jerzy Szczepkowski Date: Tue, 28 Apr 2015 17:02:45 +0200 Subject: [PATCH 2/3] Added flag to set cluster class B network address for pods, add flag to disable allocation CIDRs for Pods. Fixed synchornization bug in NodeController registerNodes(). --- cluster/gce/config-default.sh | 1 + cluster/gce/configure-vm.sh | 2 + cluster/gce/coreos/helper.sh | 2 + cluster/gce/debian/helper.sh | 2 + cluster/gce/util.sh | 2 + .../kube-controller-manager.manifest | 9 +- cmd/integration/integration.go | 2 +- .../app/controllermanager.go | 10 ++- cmd/kubernetes/kubernetes.go | 2 +- pkg/cloudprovider/gce/gce.go | 6 +- .../nodecontroller/nodecontroller.go | 86 +++++++++++-------- .../nodecontroller/nodecontroller_test.go | 26 ++++-- 12 files changed, 97 insertions(+), 53 deletions(-) diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index a6b37174932..3098151c1f4 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -42,6 +42,7 @@ MINION_SCOPES=("storage-ro" "compute-rw" "https://www.googleapis.com/auth/monito # Increase the sleep interval value if concerned about API rate limits. 3, in seconds, is the default. POLL_SLEEP_INTERVAL=3 PORTAL_NET="10.0.0.0/16" +ALLOCATE_NODE_CIDRS=true # When set to true, Docker Cache is enabled by default as part of the cluster bring up. ENABLE_DOCKER_REGISTRY_CACHE=true diff --git a/cluster/gce/configure-vm.sh b/cluster/gce/configure-vm.sh index 459ea16c976..91c0adaf93f 100644 --- a/cluster/gce/configure-vm.sh +++ b/cluster/gce/configure-vm.sh @@ -234,6 +234,8 @@ function create-salt-pillar() { cat </srv/salt-overlay/pillar/cluster-params.sls instance_prefix: '$(echo "$INSTANCE_PREFIX" | sed -e "s/'/''/g")' node_instance_prefix: '$(echo "$NODE_INSTANCE_PREFIX" | sed -e "s/'/''/g")' +cluster_class_b: '$(echo "$KUBE_GCE_CLUSTER_CLASS_B" | sed -e "s/'/''/g")' +allocate_node_cidrs: '$(echo "$ALLOCATE_NODE_CIDRS" | sed -e "s/'/''/g")' portal_net: '$(echo "$PORTAL_NET" | sed -e "s/'/''/g")' enable_cluster_monitoring: '$(echo "$ENABLE_CLUSTER_MONITORING" | sed -e "s/'/''/g")' enable_node_monitoring: '$(echo "$ENABLE_NODE_MONITORING" | sed -e "s/'/''/g")' diff --git a/cluster/gce/coreos/helper.sh b/cluster/gce/coreos/helper.sh index 92fe690fe42..6db9beb28ab 100644 --- a/cluster/gce/coreos/helper.sh +++ b/cluster/gce/coreos/helper.sh @@ -28,9 +28,11 @@ function build-kube-env { ENV_TIMESTAMP: $(yaml-quote $(date -u +%Y-%m-%dT%T%z)) INSTANCE_PREFIX: $(yaml-quote ${INSTANCE_PREFIX}) NODE_INSTANCE_PREFIX: $(yaml-quote ${NODE_INSTANCE_PREFIX}) +KUBE_GCE_CLUSTER_CLASS_B: $(yaml-quote ${KUBE_GCE_CLUSTER_CLASS_B:-10.244}) SERVER_BINARY_TAR_URL: $(yaml-quote ${SERVER_BINARY_TAR_URL}) SALT_TAR_URL: $(yaml-quote ${SALT_TAR_URL}) PORTAL_NET: $(yaml-quote ${PORTAL_NET}) +ALLOCATE_NODE_CIDRS: $(yaml-quote ${ALLOCATE_NODE_CIDRS:-false}) ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-false}) ENABLE_NODE_MONITORING: $(yaml-quote ${ENABLE_NODE_MONITORING:-false}) ENABLE_CLUSTER_LOGGING: $(yaml-quote ${ENABLE_CLUSTER_LOGGING:-false}) diff --git a/cluster/gce/debian/helper.sh b/cluster/gce/debian/helper.sh index 7a33a84a5d6..8551d821e52 100644 --- a/cluster/gce/debian/helper.sh +++ b/cluster/gce/debian/helper.sh @@ -26,9 +26,11 @@ function build-kube-env { ENV_TIMESTAMP: $(yaml-quote $(date -u +%Y-%m-%dT%T%z)) INSTANCE_PREFIX: $(yaml-quote ${INSTANCE_PREFIX}) NODE_INSTANCE_PREFIX: $(yaml-quote ${NODE_INSTANCE_PREFIX}) +KUBE_GCE_CLUSTER_CLASS_B: $(yaml-quote ${KUBE_GCE_CLUSTER_CLASS_B:-10.244}) SERVER_BINARY_TAR_URL: $(yaml-quote ${SERVER_BINARY_TAR_URL}) SALT_TAR_URL: $(yaml-quote ${SALT_TAR_URL}) PORTAL_NET: $(yaml-quote ${PORTAL_NET}) +ALLOCATE_NODE_CIDRS: $(yaml-quote ${ALLOCATE_NODE_CIDRS:-false}) ENABLE_CLUSTER_MONITORING: $(yaml-quote ${ENABLE_CLUSTER_MONITORING:-false}) ENABLE_NODE_MONITORING: $(yaml-quote ${ENABLE_NODE_MONITORING:-false}) ENABLE_CLUSTER_LOGGING: $(yaml-quote ${ENABLE_CLUSTER_LOGGING:-false}) diff --git a/cluster/gce/util.sh b/cluster/gce/util.sh index 8cc0b0935d1..70dfb33eb3f 100755 --- a/cluster/gce/util.sh +++ b/cluster/gce/util.sh @@ -32,6 +32,8 @@ fi NODE_INSTANCE_PREFIX="${INSTANCE_PREFIX}-minion" +ALLOCATE_NODE_CIDRS=true + KUBE_PROMPT_FOR_UPDATE=y KUBE_SKIP_UPDATE=${KUBE_SKIP_UPDATE-"n"} diff --git a/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest b/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest index cab49fb4f7c..eba84089489 100644 --- a/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest +++ b/cluster/saltbase/salt/kube-controller-manager/kube-controller-manager.manifest @@ -1,5 +1,6 @@ {% set machines = ""-%} {% set cluster_name = "" -%} +{% set cluster_class_b = "" -%} {% set minion_regexp = "--minion_regexp=.*" -%} {% set sync_nodes = "--sync_nodes=true" -%} @@ -9,6 +10,12 @@ {% if pillar['instance_prefix'] is defined -%} {% set cluster_name = "--cluster_name=" + pillar['instance_prefix'] -%} {% endif -%} +{% if pillar['cluster_class_b'] is defined -%} + {% set cluster_class_b = "--cluster-class-b=" + pillar['cluster_class_b'] -%} +{% endif -%} +{% if pillar['allocate_node_cidrs'] is defined -%} + {% set allocate_node_cidrs = "--allocate-node-cidrs=" + pillar['allocate_node_cidrs'] -%} +{% endif -%} {% set cloud_provider = "" -%} {% set cloud_config = "" -%} @@ -47,7 +54,7 @@ {% endif -%} {% endif -%} -{% set params = "--master=127.0.0.1:8080" + " " + machines + " " + cluster_name + " " + minion_regexp + " " + cloud_provider + " " + sync_nodes + " " + cloud_config + " " + pillar['log_level'] -%} +{% set params = "--master=127.0.0.1:8080" + " " + machines + " " + cluster_name + " " + cluster_class_b + " " + allocate_node_cidrs + " " + minion_regexp + " " + cloud_provider + " " + sync_nodes + " " + cloud_config + " " + pillar['log_level'] -%} { "apiVersion": "v1beta3", diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 776160b3b6b..9749a1f688e 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -225,7 +225,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), }} - nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "") + nodeController := nodecontroller.NewNodeController(nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "", "", false) nodeController.Run(5*time.Second, true) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 825aa4f52c0..74f2078fcbf 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -78,8 +78,10 @@ type CMServer struct { NodeMilliCPU int64 NodeMemory resource.Quantity - ClusterName string - EnableProfiling bool + ClusterName string + ClusterClassB string + AllocateNodeCIDRs bool + EnableProfiling bool Master string Kubeconfig string @@ -145,6 +147,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node-memory", "The amount of memory (in bytes) provisioned on each node") fs.StringVar(&s.ClusterName, "cluster-name", s.ClusterName, "The instance prefix for the cluster") fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/") + fs.StringVar(&s.ClusterClassB, "cluster-class-b", "10.244", "Class B network address for Pods in cluster.") + fs.BoolVar(&s.AllocateNodeCIDRs, "allocate-node-cidrs", false, "Should CIDRs for Pods be allocated and set on the cloud provider.") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") } @@ -226,7 +230,7 @@ func (s *CMServer) Run(_ []string) error { nodeController := nodecontroller.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), - s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName) + s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName, s.ClusterClassB, s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) serviceController := servicecontroller.New(cloud, kubeClient, s.ClusterName) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 4826f587395..559460473a9 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -132,7 +132,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, const nodeSyncPeriod = 10 * time.Second nodeController := nodecontroller.NewNodeController( - nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "") + nil, "", machineList, nodeResources, cl, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "", "", false) nodeController.Run(nodeSyncPeriod, true) serviceController := servicecontroller.New(nil, cl, "kubernetes") diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index 891674a621d..f6057f3902c 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -53,7 +53,9 @@ type GCECloud struct { projectID string zone string instanceID string - networkName string + + // We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it. + networkName string // Used for accessing the metadata server metadataAccess func(string) (string, error) @@ -243,6 +245,7 @@ func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, e pollOp := op for pollOp.Status != "DONE" { var err error + // TODO: add some backoff here. time.Sleep(time.Second) pollOp, err = getOperation() if err != nil { @@ -569,6 +572,7 @@ func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error { } return ErrMetadataConflict } + // We are setting the metadata, so they can be picked-up by the configure-vm.sh script to start docker with the given CIDR for Pods. instance.Metadata.Items = append(instance.Metadata.Items, &compute.MetadataItems{ Key: podCIDRMetadataKey, diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index ec13fb873ea..1af4c958df5 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -22,6 +22,7 @@ import ( "net" "strings" "sync" + "sync/atomic" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -88,6 +89,8 @@ type NodeController struct { // TODO: Change node status monitor to watch based. nodeMonitorPeriod time.Duration clusterName string + clusterClassB string + allocateNodeCIDRs bool // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) now func() util.Time @@ -106,7 +109,9 @@ func NewNodeController( nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, - clusterName string) *NodeController { + clusterName string, + clusterClassB string, + allocateNodeCIDRs bool) *NodeController { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) if kubeClient != nil { @@ -132,24 +137,26 @@ func NewNodeController( lookupIP: net.LookupIP, now: util.Now, clusterName: clusterName, + clusterClassB: clusterClassB, + allocateNodeCIDRs: allocateNodeCIDRs, } } // Generates num pod CIDRs that could be assigned to nodes. -func generateCIDRs(num int) util.StringSet { +func (nc *NodeController) generateCIDRs(num int) util.StringSet { res := util.NewStringSet() for i := 0; i < num; i++ { // TODO: Make the CIDRs configurable. - res.Insert(fmt.Sprintf("10.244.%v.0/24", i)) + res.Insert(fmt.Sprintf("%v.%v.0/24", nc.clusterClassB, i)) } return res } // For each node from newNodes, finds its current spec in registeredNodes. // If it is not there, it gets a new valid CIDR assigned. -func reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList { +func (nc *NodeController) reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList { registeredCIDRs := make(map[string]string) - availableCIDRs := generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items)) + availableCIDRs := nc.generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items)) for _, node := range registeredNodes.Items { registeredCIDRs[node.Name] = node.Spec.PodCIDR availableCIDRs.Delete(node.Spec.PodCIDR) @@ -220,8 +227,8 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) { if nodes, err = nc.populateAddresses(nodes); err != nil { glog.Errorf("Error getting nodes ips: %v", err) } - if nc.isRunningCloudProvider() { - reconcilePodCIDRs(nodes, &api.NodeList{}) + if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs { + nc.reconcilePodCIDRs(nodes, &api.NodeList{}) } if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil { glog.Errorf("Error registering node list %+v: %v", nodes, err) @@ -249,38 +256,37 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret if len(nodes.Items) == 0 { return nil } - - registered := util.NewStringSet() nodes = nc.canonicalizeName(nodes) - for i := 0; i < retryCount; i++ { - var wg sync.WaitGroup - wg.Add(len(nodes.Items)) - for i := range nodes.Items { - go func(node *api.Node) { + toRegister := util.NewStringSet() + var wg sync.WaitGroup + var successfullyRegistered int32 = 0 + for i := range nodes.Items { + node := &nodes.Items[i] + if !toRegister.Has(node.Name) { + wg.Add(1) + toRegister.Insert(node.Name) + go func(n *api.Node) { defer wg.Done() - if registered.Has(node.Name) { - return + for i := 0; i < retryCount; i++ { + if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs { + nc.configureNodeCIDR(n) + } + _, err := nc.kubeClient.Nodes().Create(n) + if err == nil || apierrors.IsAlreadyExists(err) { + glog.Infof("Registered node in registry: %v", n.Name) + atomic.AddInt32(&successfullyRegistered, 1) + return + } else { + glog.Errorf("Error registering node %v (retries left: %v): %v", n.Name, retryCount-i-1, err) + } + time.Sleep(retryInterval) } - if nc.isRunningCloudProvider() { - nc.configureNodeCIDR(node) - } - _, err := nc.kubeClient.Nodes().Create(node) - if err == nil || apierrors.IsAlreadyExists(err) { - registered.Insert(node.Name) - glog.Infof("Registered node in registry: %s", node.Name) - } else { - glog.Errorf("Error registering node %s, retrying: %s", node.Name, err) - } - }(&nodes.Items[i]) + glog.Errorf("Unable to register node %v", n.Name) + }(node) } - wg.Wait() - if registered.Len() == len(nodes.Items) { - glog.Infof("Successfully registered all nodes") - return nil - } - time.Sleep(retryInterval) } - if registered.Len() != len(nodes.Items) { + wg.Wait() + if int32(toRegister.Len()) != atomic.LoadInt32(&successfullyRegistered) { return ErrRegistration } else { return nil @@ -302,7 +308,9 @@ func (nc *NodeController) syncCloudNodes() error { node := nodes.Items[i] nodeMap[node.Name] = &node } - reconcilePodCIDRs(matches, nodes) + if nc.allocateNodeCIDRs { + nc.reconcilePodCIDRs(matches, nodes) + } var wg sync.WaitGroup wg.Add(len(matches.Items)) // Create nodes which have been created in cloud, but not in kubernetes cluster @@ -320,7 +328,9 @@ func (nc *NodeController) syncCloudNodes() error { return } node.Status.Addresses = nodeList.Items[0].Status.Addresses - nc.configureNodeCIDR(node) + if nc.allocateNodeCIDRs { + nc.configureNodeCIDR(node) + } glog.Infof("Create node in registry: %s", node.Name) _, err = nc.kubeClient.Nodes().Create(node) if err != nil { @@ -337,7 +347,9 @@ func (nc *NodeController) syncCloudNodes() error { for nodeID := range nodeMap { go func(nodeID string) { defer wg.Done() - nc.unassignNodeCIDR(nodeID) + if nc.allocateNodeCIDRs { + nc.unassignNodeCIDR(nodeID) + } glog.Infof("Delete node from registry: %s", nodeID) err = nc.kubeClient.Nodes().Delete(nodeID) if err != nil { diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go index 2b4d1514ee9..3995c5f73f5 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller_test.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller_test.go @@ -21,6 +21,7 @@ import ( "fmt" "reflect" "sort" + "sync" "testing" "time" @@ -59,6 +60,9 @@ type FakeNodeHandler struct { UpdatedNodes []*api.Node UpdatedNodeStatuses []*api.Node RequestCount int + + // Synchronization + createLock sync.Mutex } func (c *FakeNodeHandler) Nodes() client.NodeInterface { @@ -66,7 +70,11 @@ func (c *FakeNodeHandler) Nodes() client.NodeInterface { } func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { - defer func() { m.RequestCount++ }() + m.createLock.Lock() + defer func() { + m.RequestCount++ + m.createLock.Unlock() + }() for _, n := range m.Existing { if n.Name == node.Name { return nil, apierrors.NewAlreadyExists("Minion", node.Name) @@ -238,7 +246,7 @@ func TestRegisterNodes(t *testing.T) { nodes.Items = append(nodes.Items, *newNode(machine)) } nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond) if !item.expectedFail && err != nil { t.Errorf("unexpected error: %v", err) @@ -324,7 +332,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) { } for _, item := range table { nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) nodes, err := nodeController.getStaticNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -386,7 +394,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) nodes, err := nodeController.getCloudNodesWithSpec() if err != nil { t.Errorf("unexpected error: %v", err) @@ -496,7 +504,7 @@ func TestSyncCloudNodes(t *testing.T) { item.fakeNodeHandler.Fake = testclient.NewSimpleFake() } nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) if err := nodeController.syncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -580,7 +588,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { item.fakeNodeHandler.Fake = testclient.NewSimpleFake() } nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) if err := nodeController.syncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -620,7 +628,7 @@ func TestPopulateNodeAddresses(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute, - util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) result, err := nodeController.populateAddresses(item.nodes) // In case of IP querying error, we should continue. if err != nil { @@ -820,7 +828,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, - testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) @@ -1022,7 +1030,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(), - testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "") + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) From 292d33e33c12161c687257fc470298f49680ba72 Mon Sep 17 00:00:00 2001 From: CJ Cullen Date: Tue, 5 May 2015 14:28:36 -0700 Subject: [PATCH 3/3] Add synchronization around nodeMap --- pkg/cloudprovider/gce/gce.go | 1 + pkg/cloudprovider/nodecontroller/nodecontroller.go | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/cloudprovider/gce/gce.go b/pkg/cloudprovider/gce/gce.go index f6057f3902c..14e60ca453c 100644 --- a/pkg/cloudprovider/gce/gce.go +++ b/pkg/cloudprovider/gce/gce.go @@ -44,6 +44,7 @@ import ( ) var ErrMetadataConflict = errors.New("Metadata already set at the same key") + const podCIDRMetadataKey string = "node-ip-range" // GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine. diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index 1af4c958df5..1b7f444e64d 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -304,9 +304,12 @@ func (nc *NodeController) syncCloudNodes() error { return err } nodeMap := make(map[string]*api.Node) + nodeMapLock := sync.Mutex{} for i := range nodes.Items { node := nodes.Items[i] + nodeMapLock.Lock() nodeMap[node.Name] = &node + nodeMapLock.Unlock() } if nc.allocateNodeCIDRs { nc.reconcilePodCIDRs(matches, nodes) @@ -318,7 +321,10 @@ func (nc *NodeController) syncCloudNodes() error { for i := range matches.Items { go func(node *api.Node) { defer wg.Done() - if _, ok := nodeMap[node.Name]; !ok { + nodeMapLock.Lock() + _, ok := nodeMap[node.Name] + nodeMapLock.Unlock() + if !ok { glog.V(3).Infof("Querying addresses for new node: %s", node.Name) nodeList := &api.NodeList{} nodeList.Items = []api.Node{*node} @@ -337,7 +343,9 @@ func (nc *NodeController) syncCloudNodes() error { glog.Errorf("Create node %s error: %v", node.Name, err) } } + nodeMapLock.Lock() delete(nodeMap, node.Name) + nodeMapLock.Unlock() }(&matches.Items[i]) } wg.Wait()