Add support for more than 500 results to GCE cloud provider API calls

for Instance.List and Routes.List which we will definitely have
more than 500 of when supporting 1000 nodes.

Add TODOs for other GCE List API calls to do similar fixes.

Add more logging to GCE's routecontroller.go when creating or deleting routes.
This commit is contained in:
Alex Mohr 2016-02-26 15:53:33 -08:00
parent 53859b032f
commit 0816fa2072
2 changed files with 163 additions and 85 deletions

View File

@ -66,6 +66,11 @@ const (
//Expected annotations for GCE //Expected annotations for GCE
gceLBAllowSourceRange = "net.beta.kubernetes.io/gce-source-ranges" gceLBAllowSourceRange = "net.beta.kubernetes.io/gce-source-ranges"
// Each page can have 500 results, but we cap how many pages
// are iterated through to prevent infinite loops if the API
// were to continuously return a nextPageToken.
maxPages = 25
) )
//validateAllowSourceRange validates annotation of allow source ranges //validateAllowSourceRange validates annotation of allow source ranges
@ -166,6 +171,7 @@ func getNetworkNameViaMetadata() (string, error) {
} }
func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, error) { func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, error) {
// TODO: use PageToken to list all not just the first 500
networkList, err := svc.Networks.List(projectID).Do() networkList, err := svc.Networks.List(projectID).Do()
if err != nil { if err != nil {
return "", err return "", err
@ -179,6 +185,7 @@ func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, e
} }
func getZonesForRegion(svc *compute.Service, projectID, region string) ([]string, error) { func getZonesForRegion(svc *compute.Service, projectID, region string) ([]string, error) {
// TODO: use PageToken to list all not just the first 500
listCall := svc.Zones.List(projectID) listCall := svc.Zones.List(projectID)
// Filtering by region doesn't seem to work // Filtering by region doesn't seem to work
@ -920,6 +927,9 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
tags := sets.NewString() tags := sets.NewString()
for zone, hostNames := range hostNamesByZone { for zone, hostNames := range hostNamesByZone {
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Instances.List(gce.projectID, zone) listCall := gce.service.Instances.List(gce.projectID, zone)
// Add the filter for hosts // Add the filter for hosts
@ -928,11 +938,15 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
// Add the fields we want // Add the fields we want
listCall = listCall.Fields("items(name,tags)") listCall = listCall.Fields("items(name,tags)")
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
res, err := listCall.Do() res, err := listCall.Do()
if err != nil { if err != nil {
return nil, err return nil, err
} }
pageToken = res.NextPageToken
for _, instance := range res.Items { for _, instance := range res.Items {
longest_tag := "" longest_tag := ""
for _, tag := range instance.Tags.Items { for _, tag := range instance.Tags.Items {
@ -947,6 +961,10 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
} }
} }
} }
if page >= maxPages {
glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages)
}
}
if len(tags) == 0 { if len(tags) == 0 {
glog.V(2).Info("No instances had tags, creating rule without target tags") glog.V(2).Info("No instances had tags, creating rule without target tags")
@ -956,16 +974,28 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
} }
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) { func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
addresses, err := gce.service.Addresses.List(gce.projectID, region).Do() pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Addresses.List(gce.projectID, region)
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
addresses, err := listCall.Do()
if err != nil { if err != nil {
return false, fmt.Errorf("failed to list gce IP addresses: %v", err) return false, fmt.Errorf("failed to list gce IP addresses: %v", err)
} }
pageToken = addresses.NextPageToken
for _, addr := range addresses.Items { for _, addr := range addresses.Items {
if addr.Address == ipAddress { if addr.Address == ipAddress {
// This project does own the address, so return success. // This project does own the address, so return success.
return true, nil return true, nil
} }
} }
}
if page >= maxPages {
glog.Errorf("projectOwnsStaticIP exceeded maxPages=%d for Addresses.List; truncating.", maxPages)
}
return false, nil return false, nil
} }
@ -1301,6 +1331,7 @@ func (gce *GCECloud) DeleteUrlMap(name string) error {
// ListUrlMaps lists all UrlMaps in the project. // ListUrlMaps lists all UrlMaps in the project.
func (gce *GCECloud) ListUrlMaps() (*compute.UrlMapList, error) { func (gce *GCECloud) ListUrlMaps() (*compute.UrlMapList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.UrlMaps.List(gce.projectID).Do() return gce.service.UrlMaps.List(gce.projectID).Do()
} }
@ -1350,6 +1381,7 @@ func (gce *GCECloud) DeleteTargetHttpProxy(name string) error {
// ListTargetHttpProxies lists all TargetHttpProxies in the project. // ListTargetHttpProxies lists all TargetHttpProxies in the project.
func (gce *GCECloud) ListTargetHttpProxies() (*compute.TargetHttpProxyList, error) { func (gce *GCECloud) ListTargetHttpProxies() (*compute.TargetHttpProxyList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.TargetHttpProxies.List(gce.projectID).Do() return gce.service.TargetHttpProxies.List(gce.projectID).Do()
} }
@ -1409,6 +1441,7 @@ func (gce *GCECloud) DeleteTargetHttpsProxy(name string) error {
// ListTargetHttpsProxies lists all TargetHttpsProxies in the project. // ListTargetHttpsProxies lists all TargetHttpsProxies in the project.
func (gce *GCECloud) ListTargetHttpsProxies() (*compute.TargetHttpsProxyList, error) { func (gce *GCECloud) ListTargetHttpsProxies() (*compute.TargetHttpsProxyList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.TargetHttpsProxies.List(gce.projectID).Do() return gce.service.TargetHttpsProxies.List(gce.projectID).Do()
} }
@ -1445,6 +1478,7 @@ func (gce *GCECloud) DeleteSslCertificate(name string) error {
// ListSslCertificates lists all SslCertificates in the project. // ListSslCertificates lists all SslCertificates in the project.
func (gce *GCECloud) ListSslCertificates() (*compute.SslCertificateList, error) { func (gce *GCECloud) ListSslCertificates() (*compute.SslCertificateList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.SslCertificates.List(gce.projectID).Do() return gce.service.SslCertificates.List(gce.projectID).Do()
} }
@ -1499,6 +1533,7 @@ func (gce *GCECloud) GetGlobalForwardingRule(name string) (*compute.ForwardingRu
// ListGlobalForwardingRules lists all GlobalForwardingRules in the project. // ListGlobalForwardingRules lists all GlobalForwardingRules in the project.
func (gce *GCECloud) ListGlobalForwardingRules() (*compute.ForwardingRuleList, error) { func (gce *GCECloud) ListGlobalForwardingRules() (*compute.ForwardingRuleList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.GlobalForwardingRules.List(gce.projectID).Do() return gce.service.GlobalForwardingRules.List(gce.projectID).Do()
} }
@ -1541,6 +1576,7 @@ func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
// ListBackendServices lists all backend services in the project. // ListBackendServices lists all backend services in the project.
func (gce *GCECloud) ListBackendServices() (*compute.BackendServiceList, error) { func (gce *GCECloud) ListBackendServices() (*compute.BackendServiceList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.BackendServices.List(gce.projectID).Do() return gce.service.BackendServices.List(gce.projectID).Do()
} }
@ -1591,6 +1627,7 @@ func (gce *GCECloud) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
// ListHttpHealthCheck lists all HttpHealthChecks in the project. // ListHttpHealthCheck lists all HttpHealthChecks in the project.
func (gce *GCECloud) ListHttpHealthChecks() (*compute.HttpHealthCheckList, error) { func (gce *GCECloud) ListHttpHealthChecks() (*compute.HttpHealthCheckList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.HttpHealthChecks.List(gce.projectID).Do() return gce.service.HttpHealthChecks.List(gce.projectID).Do()
} }
@ -1621,11 +1658,13 @@ func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
// ListInstanceGroups lists all InstanceGroups in the project and zone. // ListInstanceGroups lists all InstanceGroups in the project and zone.
func (gce *GCECloud) ListInstanceGroups(zone string) (*compute.InstanceGroupList, error) { func (gce *GCECloud) ListInstanceGroups(zone string) (*compute.InstanceGroupList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.List(gce.projectID, zone).Do() return gce.service.InstanceGroups.List(gce.projectID, zone).Do()
} }
// ListInstancesInInstanceGroup lists all the instances in a given instance group and state. // ListInstancesInInstanceGroup lists all the instances in a given instance group and state.
func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) (*compute.InstanceGroupsListInstances, error) { func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) (*compute.InstanceGroupsListInstances, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.ListInstances( return gce.service.InstanceGroups.ListInstances(
gce.projectID, zone, name, gce.projectID, zone, name,
&compute.InstanceGroupsListInstancesRequest{InstanceState: state}).Do() &compute.InstanceGroupsListInstancesRequest{InstanceState: state}).Do()
@ -1846,18 +1885,29 @@ func (gce *GCECloud) List(filter string) ([]string, error) {
var instances []string var instances []string
// TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically) // TODO: Parallelize, although O(zones) so not too bad (N <= 3 typically)
for _, zone := range gce.managedZones { for _, zone := range gce.managedZones {
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Instances.List(gce.projectID, zone) listCall := gce.service.Instances.List(gce.projectID, zone)
if len(filter) > 0 { if len(filter) > 0 {
listCall = listCall.Filter("name eq " + filter) listCall = listCall.Filter("name eq " + filter)
} }
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
res, err := listCall.Do() res, err := listCall.Do()
if err != nil { if err != nil {
return nil, err return nil, err
} }
pageToken = res.NextPageToken
for _, instance := range res.Items { for _, instance := range res.Items {
instances = append(instances, instance.Name) instances = append(instances, instance.Name)
} }
} }
if page >= maxPages {
glog.Errorf("List exceeded maxPages=%d for Instances.List: truncating.", maxPages)
}
}
return instances, nil return instances, nil
} }
@ -1878,16 +1928,23 @@ func truncateClusterName(clusterName string) string {
} }
func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) { func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
var routes []*cloudprovider.Route
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Routes.List(gce.projectID) listCall := gce.service.Routes.List(gce.projectID)
prefix := truncateClusterName(clusterName) prefix := truncateClusterName(clusterName)
listCall = listCall.Filter("name eq " + prefix + "-.*") listCall = listCall.Filter("name eq " + prefix + "-.*")
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
res, err := listCall.Do() res, err := listCall.Do()
if err != nil { if err != nil {
glog.Errorf("Error getting routes from GCE: %v", err)
return nil, err return nil, err
} }
var routes []*cloudprovider.Route pageToken = res.NextPageToken
for _, r := range res.Items { for _, r := range res.Items {
if r.Network != gce.networkURL { if r.Network != gce.networkURL {
continue continue
@ -1904,6 +1961,10 @@ func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, err
target := path.Base(r.NextHopInstance) target := path.Base(r.NextHopInstance)
routes = append(routes, &cloudprovider.Route{Name: r.Name, TargetInstance: target, DestinationCIDR: r.DestRange}) routes = append(routes, &cloudprovider.Route{Name: r.Name, TargetInstance: target, DestinationCIDR: r.DestRange})
} }
}
if page >= maxPages {
glog.Errorf("ListRoutes exceeded maxPages=%d for Routes.List; truncating.", maxPages)
}
return routes, nil return routes, nil
} }
@ -2160,6 +2221,7 @@ func (gce *GCECloud) convertDiskToAttachedDisk(disk *gceDisk, readWrite string)
} }
func (gce *GCECloud) listClustersInZone(zone string) ([]string, error) { func (gce *GCECloud) listClustersInZone(zone string) ([]string, error) {
// TODO: use PageToken to list all not just the first 500
list, err := gce.containerService.Projects.Zones.Clusters.List(gce.projectID, zone).Do() list, err := gce.containerService.Projects.Zones.Clusters.List(gce.projectID, zone).Do()
if err != nil { if err != nil {
return nil, err return nil, err
@ -2225,18 +2287,24 @@ func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error)
break break
} }
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Instances.List(gce.projectID, zone) listCall := gce.service.Instances.List(gce.projectID, zone)
// Add the filter for hosts // Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(remaining, "|") + ")") listCall = listCall.Filter("name eq (" + strings.Join(remaining, "|") + ")")
listCall = listCall.Fields("items(name,id,disks,machineType)") listCall = listCall.Fields("items(name,id,disks,machineType)")
if pageToken != "" {
listCall.PageToken(pageToken)
}
res, err := listCall.Do() res, err := listCall.Do()
if err != nil { if err != nil {
return nil, err return nil, err
} }
pageToken = res.NextPageToken
for _, i := range res.Items { for _, i := range res.Items {
name := i.Name name := i.Name
instance := &gceInstance{ instance := &gceInstance{
@ -2249,6 +2317,10 @@ func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error)
instances[name] = instance instances[name] = instance
} }
} }
if page >= maxPages {
glog.Errorf("getInstancesByNames exceeded maxPages=%d for Instances.List: truncating.", maxPages)
}
}
instanceArray := make([]*gceInstance, len(names)) instanceArray := make([]*gceInstance, len(names))
for i, name := range names { for i, name := range names {

View File

@ -91,28 +91,34 @@ func (rc *RouteController) reconcile(nodes []api.Node, routes []*cloudprovider.R
} }
nameHint := string(node.UID) nameHint := string(node.UID)
wg.Add(1) wg.Add(1)
go func(nameHint string, route *cloudprovider.Route) { glog.Infof("Creating route for node %s %s with hint %s", node.Name, route.DestinationCIDR, nameHint)
go func(nodeName string, nameHint string, route *cloudprovider.Route, startTime time.Time) {
if err := rc.routes.CreateRoute(rc.clusterName, nameHint, route); err != nil { if err := rc.routes.CreateRoute(rc.clusterName, nameHint, route); err != nil {
glog.Errorf("Could not create route %s %s: %v", nameHint, route.DestinationCIDR, err) glog.Errorf("Could not create route %s %s for node %s after %v: %v", nameHint, route.DestinationCIDR, nodeName, time.Now().Sub(startTime), err)
} else {
glog.Infof("Created route for node %s %s with hint %s after %v", nodeName, route.DestinationCIDR, nameHint, time.Now().Sub(startTime))
} }
wg.Done() wg.Done()
}(nameHint, route) }(node.Name, nameHint, route, time.Now())
} }
nodeCIDRs[node.Name] = node.Spec.PodCIDR nodeCIDRs[node.Name] = node.Spec.PodCIDR
} }
wg.Wait()
for _, route := range routes { for _, route := range routes {
if rc.isResponsibleForRoute(route) { if rc.isResponsibleForRoute(route) {
// Check if this route applies to a node we know about & has correct CIDR. // Check if this route applies to a node we know about & has correct CIDR.
if nodeCIDRs[route.TargetInstance] != route.DestinationCIDR { if nodeCIDRs[route.TargetInstance] != route.DestinationCIDR {
wg.Add(1) wg.Add(1)
// Delete the route. // Delete the route.
go func(route *cloudprovider.Route) { glog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
go func(route *cloudprovider.Route, startTime time.Time) {
if err := rc.routes.DeleteRoute(rc.clusterName, route); err != nil { if err := rc.routes.DeleteRoute(rc.clusterName, route); err != nil {
glog.Errorf("Could not delete route %s %s: %v", route.Name, route.DestinationCIDR, err) glog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime), err)
} else {
glog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime))
} }
wg.Done() wg.Done()
}(route)
}(route, time.Now())
} }
} }
} }