Merge pull request #22094 from alex-mohr/routes

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-29 05:46:51 -08:00
commit fe03c663d9
2 changed files with 163 additions and 85 deletions

View File

@ -66,6 +66,11 @@ const (
//Expected annotations for GCE
gceLBAllowSourceRange = "net.beta.kubernetes.io/gce-source-ranges"
// Each page can have 500 results, but we cap how many pages
// are iterated through to prevent infinite loops if the API
// were to continuously return a nextPageToken.
maxPages = 25
)
//validateAllowSourceRange validates annotation of allow source ranges
@ -179,6 +184,7 @@ func getNetworkNameViaMetadata() (string, error) {
}
func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, error) {
// TODO: use PageToken to list all not just the first 500
networkList, err := svc.Networks.List(projectID).Do()
if err != nil {
return "", err
@ -192,6 +198,7 @@ func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, e
}
func getZonesForRegion(svc *compute.Service, projectID, region string) ([]string, error) {
// TODO: use PageToken to list all not just the first 500
listCall := svc.Zones.List(projectID)
// Filtering by region doesn't seem to work
@ -938,31 +945,42 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
tags := sets.NewString()
for zone, hostNames := range hostNamesByZone {
listCall := gce.service.Instances.List(gce.projectID, zone)
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Instances.List(gce.projectID, zone)
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(hostNames, "|") + ")")
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(hostNames, "|") + ")")
// Add the fields we want
listCall = listCall.Fields("items(name,tags)")
// Add the fields we want
listCall = listCall.Fields("items(name,tags)")
res, err := listCall.Do()
if err != nil {
return nil, err
}
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
for _, instance := range res.Items {
longest_tag := ""
for _, tag := range instance.Tags.Items {
if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
longest_tag = tag
res, err := listCall.Do()
if err != nil {
return nil, err
}
pageToken = res.NextPageToken
for _, instance := range res.Items {
longest_tag := ""
for _, tag := range instance.Tags.Items {
if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
longest_tag = tag
}
}
if len(longest_tag) > 0 {
tags.Insert(longest_tag)
} else if len(tags) > 0 {
return nil, fmt.Errorf("Some, but not all, instances have prefix tags (%s is missing)", instance.Name)
}
}
if len(longest_tag) > 0 {
tags.Insert(longest_tag)
} else if len(tags) > 0 {
return nil, fmt.Errorf("Some, but not all, instances have prefix tags (%s is missing)", instance.Name)
}
}
if page >= maxPages {
glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages)
}
}
@ -974,15 +992,27 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
}
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do()
if err != nil {
return false, fmt.Errorf("failed to list gce IP addresses: %v", err)
}
for _, addr := range addresses.Items {
if addr.Address == ipAddress {
// This project does own the address, so return success.
return true, nil
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Addresses.List(gce.projectID, region)
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
addresses, err := listCall.Do()
if err != nil {
return false, fmt.Errorf("failed to list gce IP addresses: %v", err)
}
pageToken = addresses.NextPageToken
for _, addr := range addresses.Items {
if addr.Address == ipAddress {
// This project does own the address, so return success.
return true, nil
}
}
}
if page >= maxPages {
glog.Errorf("projectOwnsStaticIP exceeded maxPages=%d for Addresses.List; truncating.", maxPages)
}
return false, nil
}
@ -1322,6 +1352,7 @@ func (gce *GCECloud) DeleteUrlMap(name string) error {
// ListUrlMaps lists all UrlMaps in the project.
func (gce *GCECloud) ListUrlMaps() (*compute.UrlMapList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.UrlMaps.List(gce.projectID).Do()
}
@ -1371,6 +1402,7 @@ func (gce *GCECloud) DeleteTargetHttpProxy(name string) error {
// ListTargetHttpProxies lists all TargetHttpProxies in the project.
func (gce *GCECloud) ListTargetHttpProxies() (*compute.TargetHttpProxyList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.TargetHttpProxies.List(gce.projectID).Do()
}
@ -1430,6 +1462,7 @@ func (gce *GCECloud) DeleteTargetHttpsProxy(name string) error {
// ListTargetHttpsProxies lists all TargetHttpsProxies in the project.
func (gce *GCECloud) ListTargetHttpsProxies() (*compute.TargetHttpsProxyList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.TargetHttpsProxies.List(gce.projectID).Do()
}
@ -1466,6 +1499,7 @@ func (gce *GCECloud) DeleteSslCertificate(name string) error {
// ListSslCertificates lists all SslCertificates in the project.
func (gce *GCECloud) ListSslCertificates() (*compute.SslCertificateList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.SslCertificates.List(gce.projectID).Do()
}
@ -1520,6 +1554,7 @@ func (gce *GCECloud) GetGlobalForwardingRule(name string) (*compute.ForwardingRu
// ListGlobalForwardingRules lists all GlobalForwardingRules in the project.
func (gce *GCECloud) ListGlobalForwardingRules() (*compute.ForwardingRuleList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.GlobalForwardingRules.List(gce.projectID).Do()
}
@ -1562,6 +1597,7 @@ func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
// ListBackendServices lists all backend services in the project.
func (gce *GCECloud) ListBackendServices() (*compute.BackendServiceList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.BackendServices.List(gce.projectID).Do()
}
@ -1612,6 +1648,7 @@ func (gce *GCECloud) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
// ListHttpHealthCheck lists all HttpHealthChecks in the project.
func (gce *GCECloud) ListHttpHealthChecks() (*compute.HttpHealthCheckList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.HttpHealthChecks.List(gce.projectID).Do()
}
@ -1642,11 +1679,13 @@ func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
// ListInstanceGroups lists all InstanceGroups in the project and zone.
func (gce *GCECloud) ListInstanceGroups(zone string) (*compute.InstanceGroupList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.List(gce.projectID, zone).Do()
}
// ListInstancesInInstanceGroup lists all the instances in a given instance group and state.
func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) (*compute.InstanceGroupsListInstances, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.ListInstances(
gce.projectID, zone, name,
&compute.InstanceGroupsListInstancesRequest{InstanceState: state}).Do()
@ -1885,16 +1924,27 @@ func (gce *GCECloud) List(filter string) ([]string, error) {
var instances []string
// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones {
listCall := gce.service.Instances.List(gce.projectID, zone)
if len(filter) > 0 {
listCall = listCall.Filter("name eq " + filter)
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Instances.List(gce.projectID, zone)
if len(filter) > 0 {
listCall = listCall.Filter("name eq " + filter)
}
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
res, err := listCall.Do()
if err != nil {
return nil, err
}
pageToken = res.NextPageToken
for _, instance := range res.Items {
instances = append(instances, instance.Name)
}
}
res, err := listCall.Do()
if err != nil {
return nil, err
}
for _, instance := range res.Items {
instances = append(instances, instance.Name)
if page >= maxPages {
glog.Errorf("List exceeded maxPages=%d for Instances.List: truncating.", maxPages)
}
}
return instances, nil
@ -1917,31 +1967,42 @@ func truncateClusterName(clusterName string) string {
}
func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
listCall := gce.service.Routes.List(gce.projectID)
prefix := truncateClusterName(clusterName)
listCall = listCall.Filter("name eq " + prefix + "-.*")
res, err := listCall.Do()
if err != nil {
return nil, err
}
var routes []*cloudprovider.Route
for _, r := range res.Items {
if r.Network != gce.networkURL {
continue
}
// Not managed if route description != "k8s-node-route"
if r.Description != k8sNodeRouteTag {
continue
}
// Not managed if route name doesn't start with <clusterName>
if !strings.HasPrefix(r.Name, prefix) {
continue
}
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Routes.List(gce.projectID)
target := path.Base(r.NextHopInstance)
routes = append(routes, &cloudprovider.Route{Name: r.Name, TargetInstance: target, DestinationCIDR: r.DestRange})
prefix := truncateClusterName(clusterName)
listCall = listCall.Filter("name eq " + prefix + "-.*")
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
res, err := listCall.Do()
if err != nil {
glog.Errorf("Error getting routes from GCE: %v", err)
return nil, err
}
pageToken = res.NextPageToken
for _, r := range res.Items {
if r.Network != gce.networkURL {
continue
}
// Not managed if route description != "k8s-node-route"
if r.Description != k8sNodeRouteTag {
continue
}
// Not managed if route name doesn't start with <clusterName>
if !strings.HasPrefix(r.Name, prefix) {
continue
}
target := path.Base(r.NextHopInstance)
routes = append(routes, &cloudprovider.Route{Name: r.Name, TargetInstance: target, DestinationCIDR: r.DestRange})
}
}
if page >= maxPages {
glog.Errorf("ListRoutes exceeded maxPages=%d for Routes.List; truncating.", maxPages)
}
return routes, nil
}
@ -2199,6 +2260,7 @@ func (gce *GCECloud) convertDiskToAttachedDisk(disk *gceDisk, readWrite string)
}
func (gce *GCECloud) listClustersInZone(zone string) ([]string, error) {
// TODO: use PageToken to list all not just the first 500
list, err := gce.containerService.Projects.Zones.Clusters.List(gce.projectID, zone).Do()
if err != nil {
return nil, err
@ -2264,28 +2326,38 @@ func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error)
break
}
listCall := gce.service.Instances.List(gce.projectID, zone)
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Instances.List(gce.projectID, zone)
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(remaining, "|") + ")")
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(remaining, "|") + ")")
listCall = listCall.Fields("items(name,id,disks,machineType)")
res, err := listCall.Do()
if err != nil {
return nil, err
}
for _, i := range res.Items {
name := i.Name
instance := &gceInstance{
Zone: zone,
Name: name,
ID: i.Id,
Disks: i.Disks,
Type: lastComponent(i.MachineType),
listCall = listCall.Fields("items(name,id,disks,machineType)")
if pageToken != "" {
listCall.PageToken(pageToken)
}
instances[name] = instance
res, err := listCall.Do()
if err != nil {
return nil, err
}
pageToken = res.NextPageToken
for _, i := range res.Items {
name := i.Name
instance := &gceInstance{
Zone: zone,
Name: name,
ID: i.Id,
Disks: i.Disks,
Type: lastComponent(i.MachineType),
}
instances[name] = instance
}
}
if page >= maxPages {
glog.Errorf("getInstancesByNames exceeded maxPages=%d for Instances.List: truncating.", maxPages)
}
}

View File

@ -91,28 +91,34 @@ func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.R
}
nameHint := string(node.UID)
wg.Add(1)
go func(nameHint string, route *cloudprovider.Route) {
glog.Infof("Creating route for node %s %s with hint %s", node.Name, route.DestinationCIDR, nameHint)
go func(nodeName string, nameHint string, route *cloudprovider.Route, startTime time.Time) {
if err := rc.routes.CreateRoute(rc.clusterName, nameHint, route); err != nil {
glog.Errorf("Could not create route %s %s: %v", nameHint, route.DestinationCIDR, err)
glog.Errorf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Now().Sub(startTime), err)
} else {
glog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Now().Sub(startTime))
}
wg.Done()
}(nameHint, route)
}(node.Name, nameHint, route, time.Now())
}
nodeCIDRs[node.Name] = node.Spec.PodCIDR
}
wg.Wait()
for _, route := range routes {
if rc.isResponsibleForRoute(route) {
// Check if this route applies to a node we know about & has correct CIDR.
if nodeCIDRs[route.TargetInstance] != route.DestinationCIDR {
wg.Add(1)
// Delete the route.
go func(route *cloudprovider.Route) {
glog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
go func(route *cloudprovider.Route, startTime time.Time) {
if err := rc.routes.DeleteRoute(rc.clusterName, route); err != nil {
glog.Errorf("Could not delete route %s %s: %v", route.Name, route.DestinationCIDR, err)
glog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime), err)
} else {
glog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime))
}
wg.Done()
}(route)
}(route, time.Now())
}
}
}