mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #46457 from nicksardo/gce-api-refactor
Automatic merge from submit-queue (batch tested with PRs 46407, 46457) GCE - Refactor API for firewall and backend service creation **What this PR does / why we need it**: - Currently, firewall creation function actually instantiates the firewall object; this is inconsistent with the rest of GCE api calls. The API normally gets passed in an existing object. - Necessary information for firewall creation, (`computeHostTags`,`nodeTags`,`networkURL`,`subnetworkURL`,`region`) were private to within the package. These now have public getters. - Consumers might need to know whether the cluster is running on a cross-project network. A new `OnXPN` func will make that information available. - Backend services for regions have been added. Global ones have been renamed to specify global. - NamedPort management of instance groups has been changed from an `AddPortsToInstanceGroup` func (and missing complementary `Remove...`) to a single, simple `SetNamedPortsOfInstanceGroup` - Addressed nitpick review comments of #45524 ILB needs the regional backend services and firewall refactor. The ingress controller needs the new `OnXPN` func to decide whether to create a firewall. **Release note**: ```release-note NONE ```
This commit is contained in:
commit
1444d252e1
@ -29,6 +29,7 @@ import (
|
||||
|
||||
"gopkg.in/gcfg.v1"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
@ -77,19 +78,25 @@ const (
|
||||
|
||||
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
|
||||
type GCECloud struct {
|
||||
ClusterID ClusterID
|
||||
|
||||
service *compute.Service
|
||||
serviceBeta *computebeta.Service
|
||||
containerService *container.Service
|
||||
clientBuilder controller.ControllerClientBuilder
|
||||
ClusterID ClusterID
|
||||
projectID string
|
||||
region string
|
||||
localZone string // The zone in which we are running
|
||||
managedZones []string // List of zones we are spanning (for multi-AZ clusters, primarily when running on master)
|
||||
networkURL string
|
||||
subnetworkURL string
|
||||
nodeTags []string // List of tags to use on firewall rules for load balancers
|
||||
nodeInstancePrefix string // If non-"", an advisory prefix for all nodes in the cluster
|
||||
networkProjectID string
|
||||
onXPN bool
|
||||
nodeTags []string // List of tags to use on firewall rules for load balancers
|
||||
lastComputedNodeTags []string // List of node tags calculated in GetHostTags()
|
||||
lastKnownNodeNames sets.String // List of hostnames used to calculate lastComputedHostTags in GetHostTags(names)
|
||||
computeNodeTagLock sync.Mutex // Lock for computing and setting node tags
|
||||
nodeInstancePrefix string // If non-"", an advisory prefix for all nodes in the cluster
|
||||
useMetadataServer bool
|
||||
operationPollRateLimiter flowcontrol.RateLimiter
|
||||
manager ServiceManager
|
||||
@ -243,6 +250,12 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
||||
networkURL = gceNetworkURL(projectID, networkName)
|
||||
}
|
||||
|
||||
networkProjectID, err := getProjectIDInURL(networkURL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
onXPN := networkProjectID != projectID
|
||||
|
||||
if len(managedZones) == 0 {
|
||||
managedZones, err = getZonesForRegion(service, projectID, region)
|
||||
if err != nil {
|
||||
@ -260,6 +273,8 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
|
||||
serviceBeta: serviceBeta,
|
||||
containerService: containerService,
|
||||
projectID: projectID,
|
||||
networkProjectID: networkProjectID,
|
||||
onXPN: onXPN,
|
||||
region: region,
|
||||
localZone: zone,
|
||||
managedZones: managedZones,
|
||||
@ -311,6 +326,26 @@ func (gce *GCECloud) ProviderName() string {
|
||||
return ProviderName
|
||||
}
|
||||
|
||||
// Region returns the region
|
||||
func (gce *GCECloud) Region() string {
|
||||
return gce.region
|
||||
}
|
||||
|
||||
// OnXPN returns true if the cluster is running on a cross project network (XPN)
|
||||
func (gce *GCECloud) OnXPN() bool {
|
||||
return gce.onXPN
|
||||
}
|
||||
|
||||
// NetworkURL returns the network url
|
||||
func (gce *GCECloud) NetworkURL() string {
|
||||
return gce.networkURL
|
||||
}
|
||||
|
||||
// SubnetworkURL returns the subnetwork url
|
||||
func (gce *GCECloud) SubnetworkURL() string {
|
||||
return gce.subnetworkURL
|
||||
}
|
||||
|
||||
// Known-useless DNS search path.
|
||||
var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`)
|
||||
|
||||
@ -336,6 +371,20 @@ func gceSubnetworkURL(project, region, subnetwork string) string {
|
||||
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/subnetworks/%s", project, region, subnetwork)
|
||||
}
|
||||
|
||||
// getProjectIDInURL parses typical full resource URLS and shorter URLS
|
||||
// https://www.googleapis.com/compute/v1/projects/myproject/global/networks/mycustom
|
||||
// projects/myproject/global/networks/mycustom
|
||||
// All return "myproject"
|
||||
func getProjectIDInURL(urlStr string) (string, error) {
|
||||
fields := strings.Split(urlStr, "/")
|
||||
for i, v := range fields {
|
||||
if v == "projects" && i < len(fields)-1 {
|
||||
return fields[i+1], nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("could not find project field in url: %v", urlStr)
|
||||
}
|
||||
|
||||
func getNetworkNameViaMetadata() (string, error) {
|
||||
result, err := metadata.Get("instance/network-interfaces/0/network")
|
||||
if err != nil {
|
||||
|
@ -30,15 +30,15 @@ func newBackendServiceMetricContext(request string) *metricContext {
|
||||
}
|
||||
}
|
||||
|
||||
// GetBackendService retrieves a backend by name.
|
||||
func (gce *GCECloud) GetBackendService(name string) (*compute.BackendService, error) {
|
||||
// GetGlobalBackendService retrieves a backend by name.
|
||||
func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendService, error) {
|
||||
mc := newBackendServiceMetricContext("get")
|
||||
v, err := gce.service.BackendServices.Get(gce.projectID, name).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// UpdateBackendService applies the given BackendService as an update to an existing service.
|
||||
func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error {
|
||||
// UpdateGlobalBackendService applies the given BackendService as an update to an existing service.
|
||||
func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error {
|
||||
mc := newBackendServiceMetricContext("update")
|
||||
op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do()
|
||||
if err != nil {
|
||||
@ -48,8 +48,8 @@ func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error {
|
||||
return gce.waitForGlobalOp(op, mc)
|
||||
}
|
||||
|
||||
// DeleteBackendService deletes the given BackendService by name.
|
||||
func (gce *GCECloud) DeleteBackendService(name string) error {
|
||||
// DeleteGlobalBackendService deletes the given BackendService by name.
|
||||
func (gce *GCECloud) DeleteGlobalBackendService(name string) error {
|
||||
mc := newBackendServiceMetricContext("delete")
|
||||
op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do()
|
||||
if err != nil {
|
||||
@ -62,8 +62,8 @@ func (gce *GCECloud) DeleteBackendService(name string) error {
|
||||
return gce.waitForGlobalOp(op, mc)
|
||||
}
|
||||
|
||||
// CreateBackendService creates the given BackendService.
|
||||
func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
|
||||
// CreateGlobalBackendService creates the given BackendService.
|
||||
func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) error {
|
||||
mc := newBackendServiceMetricContext("create")
|
||||
op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do()
|
||||
if err != nil {
|
||||
@ -73,16 +73,81 @@ func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
|
||||
return gce.waitForGlobalOp(op, mc)
|
||||
}
|
||||
|
||||
// ListBackendServices lists all backend services in the project.
|
||||
func (gce *GCECloud) ListBackendServices() (*compute.BackendServiceList, error) {
|
||||
// ListGlobalBackendServices lists all backend services in the project.
|
||||
func (gce *GCECloud) ListGlobalBackendServices() (*compute.BackendServiceList, error) {
|
||||
mc := newBackendServiceMetricContext("list")
|
||||
// TODO: use PageToken to list all not just the first 500
|
||||
return gce.service.BackendServices.List(gce.projectID).Do()
|
||||
v, err := gce.service.BackendServices.List(gce.projectID).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// GetHealth returns the health of the BackendService identified by the given
|
||||
// GetGlobalBackendServiceHealth returns the health of the BackendService identified by the given
|
||||
// name, in the given instanceGroup. The instanceGroupLink is the fully
|
||||
// qualified self link of an instance group.
|
||||
func (gce *GCECloud) GetHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
|
||||
func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
|
||||
mc := newBackendServiceMetricContext("get_health")
|
||||
groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
|
||||
return gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do()
|
||||
v, err := gce.service.BackendServices.GetHealth(gce.projectID, name, groupRef).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// GetRegionBackendService retrieves a backend by name.
|
||||
func (gce *GCECloud) GetRegionBackendService(name, region string) (*compute.BackendService, error) {
|
||||
mc := newBackendServiceMetricContext("get")
|
||||
v, err := gce.service.RegionBackendServices.Get(gce.projectID, region, name).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// UpdateRegionBackendService applies the given BackendService as an update to an existing service.
|
||||
func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService) error {
|
||||
mc := newBackendServiceMetricContext("update")
|
||||
op, err := gce.service.RegionBackendServices.Update(gce.projectID, bg.Region, bg.Name, bg).Do()
|
||||
if err != nil {
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
return gce.waitForRegionOp(op, bg.Region, mc)
|
||||
}
|
||||
|
||||
// DeleteRegionBackendService deletes the given BackendService by name.
|
||||
func (gce *GCECloud) DeleteRegionBackendService(name, region string) error {
|
||||
mc := newBackendServiceMetricContext("delete")
|
||||
op, err := gce.service.RegionBackendServices.Delete(gce.projectID, region, name).Do()
|
||||
if err != nil {
|
||||
if isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
return nil
|
||||
}
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
return gce.waitForRegionOp(op, region, mc)
|
||||
}
|
||||
|
||||
// CreateRegionBackendService creates the given BackendService.
|
||||
func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService) error {
|
||||
mc := newBackendServiceMetricContext("create")
|
||||
op, err := gce.service.RegionBackendServices.Insert(gce.projectID, bg.Region, bg).Do()
|
||||
if err != nil {
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
return gce.waitForRegionOp(op, bg.Region, mc)
|
||||
}
|
||||
|
||||
// ListRegionBackendServices lists all backend services in the project.
|
||||
func (gce *GCECloud) ListRegionBackendServices(region string) (*compute.BackendServiceList, error) {
|
||||
mc := newBackendServiceMetricContext("list")
|
||||
// TODO: use PageToken to list all not just the first 500
|
||||
v, err := gce.service.RegionBackendServices.List(gce.projectID, region).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// GetRegionalBackendServiceHealth returns the health of the BackendService identified by the given
|
||||
// name, in the given instanceGroup. The instanceGroupLink is the fully
|
||||
// qualified self link of an instance group.
|
||||
func (gce *GCECloud) GetRegionalBackendServiceHealth(name, region string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
|
||||
mc := newBackendServiceMetricContext("get_health")
|
||||
groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
|
||||
v, err := gce.service.RegionBackendServices.GetHealth(gce.projectID, region, name, groupRef).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
@ -19,96 +19,51 @@ package gce
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
netsets "k8s.io/kubernetes/pkg/util/net/sets"
|
||||
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
)
|
||||
|
||||
func newFirewallMetricContext(request string, region string) *metricContext {
|
||||
func newFirewallMetricContext(request string) *metricContext {
|
||||
return &metricContext{
|
||||
start: time.Now(),
|
||||
attributes: []string{"firewall_" + request, region, unusedMetricLabel},
|
||||
attributes: []string{"firewall_" + request, unusedMetricLabel, unusedMetricLabel},
|
||||
}
|
||||
}
|
||||
|
||||
// GetFirewall returns the Firewall by name.
|
||||
func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
|
||||
mc := newFirewallMetricContext("get", "")
|
||||
mc := newFirewallMetricContext("get")
|
||||
v, err := gce.service.Firewalls.Get(gce.projectID, name).Do()
|
||||
return v, mc.Observe(err)
|
||||
}
|
||||
|
||||
// CreateFirewall creates the given firewall rule.
|
||||
func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error {
|
||||
region, err := GetGCERegion(gce.localZone)
|
||||
// CreateFirewall creates the passed firewall
|
||||
func (gce *GCECloud) CreateFirewall(f *compute.Firewall) error {
|
||||
mc := newFirewallMetricContext("create")
|
||||
op, err := gce.service.Firewalls.Insert(gce.projectID, f).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
mc := newFirewallMetricContext("create", region)
|
||||
// TODO: This completely breaks modularity in the cloudprovider but
|
||||
// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
|
||||
svcPorts := []v1.ServicePort{}
|
||||
// TODO: Currently the only consumer of this method is the GCE L7
|
||||
// loadbalancer controller, which never needs a protocol other than
|
||||
// TCP. We should pipe through a mapping of port:protocol and
|
||||
// default to TCP if UDP ports are required. This means the method
|
||||
// signature will change forcing downstream clients to refactor
|
||||
// interfaces.
|
||||
for _, p := range ports {
|
||||
svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
|
||||
}
|
||||
|
||||
hosts, err := gce.getInstancesByNames(hostNames)
|
||||
if err != nil {
|
||||
mc.Observe(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return mc.Observe(gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
|
||||
return gce.waitForGlobalOp(op, mc)
|
||||
}
|
||||
|
||||
// DeleteFirewall deletes the given firewall rule.
|
||||
func (gce *GCECloud) DeleteFirewall(name string) error {
|
||||
region, err := GetGCERegion(gce.localZone)
|
||||
mc := newFirewallMetricContext("delete")
|
||||
op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
mc := newFirewallMetricContext("delete", region)
|
||||
|
||||
return mc.Observe(gce.deleteFirewall(name, region))
|
||||
return gce.waitForGlobalOp(op, mc)
|
||||
}
|
||||
|
||||
// UpdateFirewall applies the given firewall rule as an update to an
|
||||
// existing firewall rule with the same name.
|
||||
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error {
|
||||
|
||||
region, err := GetGCERegion(gce.localZone)
|
||||
// UpdateFirewall applies the given firewall as an update to an existing service.
|
||||
func (gce *GCECloud) UpdateFirewall(f *compute.Firewall) error {
|
||||
mc := newFirewallMetricContext("update")
|
||||
op, err := gce.service.Firewalls.Update(gce.projectID, f.Name, f).Do()
|
||||
if err != nil {
|
||||
return err
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
mc := newFirewallMetricContext("update", region)
|
||||
// TODO: This completely breaks modularity in the cloudprovider but
|
||||
// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
|
||||
svcPorts := []v1.ServicePort{}
|
||||
// TODO: Currently the only consumer of this method is the GCE L7
|
||||
// loadbalancer controller, which never needs a protocol other than
|
||||
// TCP. We should pipe through a mapping of port:protocol and
|
||||
// default to TCP if UDP ports are required. This means the method
|
||||
// signature will change, forcing downstream clients to refactor
|
||||
// interfaces.
|
||||
for _, p := range ports {
|
||||
svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
|
||||
}
|
||||
|
||||
hosts, err := gce.getInstancesByNames(hostNames)
|
||||
if err != nil {
|
||||
mc.Observe(err)
|
||||
return err
|
||||
}
|
||||
|
||||
return mc.Observe(gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
|
||||
return gce.waitForGlobalOp(op, mc)
|
||||
}
|
||||
|
@ -29,11 +29,22 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
minNodesHealthCheckVersion = "1.7.0"
|
||||
nodesHealthCheckPath = "/healthz"
|
||||
lbNodesHealthCheckPort = ports.ProxyHealthzPort
|
||||
nodesHealthCheckPath = "/healthz"
|
||||
lbNodesHealthCheckPort = ports.ProxyHealthzPort
|
||||
)
|
||||
|
||||
var (
|
||||
minNodesHealthCheckVersion *utilversion.Version
|
||||
)
|
||||
|
||||
func init() {
|
||||
if v, err := utilversion.ParseGeneric("1.7.0"); err != nil {
|
||||
panic(err)
|
||||
} else {
|
||||
minNodesHealthCheckVersion = v
|
||||
}
|
||||
}
|
||||
|
||||
func newHealthcheckMetricContext(request string) *metricContext {
|
||||
return &metricContext{
|
||||
start: time.Now(),
|
||||
@ -212,28 +223,22 @@ func makeNodesHealthCheckName(clusterID string) string {
|
||||
// MakeHealthCheckFirewallName returns the firewall name used by the GCE load
|
||||
// balancers (l4) for performing health checks.
|
||||
func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string {
|
||||
// TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc.
|
||||
fwName := "k8s-" + hcName + "-http-hc"
|
||||
if isNodesHealthCheck {
|
||||
fwName = makeNodesHealthCheckName(clusterID) + "-http-hc"
|
||||
// TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc.
|
||||
return makeNodesHealthCheckName(clusterID) + "-http-hc"
|
||||
}
|
||||
return fwName
|
||||
return "k8s-" + hcName + "-http-hc"
|
||||
}
|
||||
|
||||
// isAtLeastMinNodesHealthCheckVersion checks if a version is higher than
|
||||
// `minNodesHealthCheckVersion`.
|
||||
func isAtLeastMinNodesHealthCheckVersion(vstring string) bool {
|
||||
minVersion, err := utilversion.ParseGeneric(minNodesHealthCheckVersion)
|
||||
if err != nil {
|
||||
glog.Errorf("MinNodesHealthCheckVersion (%s) is not a valid version string: %v", minNodesHealthCheckVersion, err)
|
||||
return false
|
||||
}
|
||||
version, err := utilversion.ParseGeneric(vstring)
|
||||
if err != nil {
|
||||
glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err)
|
||||
return false
|
||||
}
|
||||
return version.AtLeast(minVersion)
|
||||
return version.AtLeast(minNodesHealthCheckVersion)
|
||||
}
|
||||
|
||||
// supportsNodesHealthCheck returns false if anyone of the nodes has version
|
||||
|
@ -17,12 +17,8 @@ limitations under the License.
|
||||
package gce
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
compute "google.golang.org/api/compute/v1"
|
||||
)
|
||||
|
||||
@ -37,7 +33,6 @@ func newInstanceGroupMetricContext(request string, zone string) *metricContext {
|
||||
// instances. It is the callers responsibility to add named ports.
|
||||
func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
|
||||
mc := newInstanceGroupMetricContext("create", zone)
|
||||
|
||||
op, err := gce.service.InstanceGroups.Insert(
|
||||
gce.projectID, zone, &compute.InstanceGroup{Name: name}).Do()
|
||||
if err != nil {
|
||||
@ -55,12 +50,10 @@ func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.Ins
|
||||
// DeleteInstanceGroup deletes an instance group.
|
||||
func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
|
||||
mc := newInstanceGroupMetricContext("delete", zone)
|
||||
|
||||
op, err := gce.service.InstanceGroups.Delete(
|
||||
gce.projectID, zone, name).Do()
|
||||
if err != nil {
|
||||
mc.Observe(err)
|
||||
return err
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
return gce.waitForZoneOp(op, zone, mc)
|
||||
@ -103,10 +96,8 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta
|
||||
&compute.InstanceGroupsAddInstancesRequest{
|
||||
Instances: instances,
|
||||
}).Do()
|
||||
|
||||
if err != nil {
|
||||
mc.Observe(err)
|
||||
return err
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
return gce.waitForZoneOp(op, zone, mc)
|
||||
@ -130,55 +121,24 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string,
|
||||
&compute.InstanceGroupsRemoveInstancesRequest{
|
||||
Instances: instances,
|
||||
}).Do()
|
||||
|
||||
if err != nil {
|
||||
if isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
mc.Observe(nil)
|
||||
return nil
|
||||
}
|
||||
|
||||
mc.Observe(err)
|
||||
return err
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
return gce.waitForZoneOp(op, zone, mc)
|
||||
}
|
||||
|
||||
// AddPortToInstanceGroup adds a port to the given instance group.
|
||||
func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int64) (*compute.NamedPort, error) {
|
||||
mc := newInstanceGroupMetricContext("add_port", ig.Zone)
|
||||
for _, np := range ig.NamedPorts {
|
||||
if np.Port == port {
|
||||
glog.V(3).Infof("Instance group %v already has named port %+v", ig.Name, np)
|
||||
return np, nil
|
||||
}
|
||||
}
|
||||
|
||||
glog.Infof("Adding port %v to instance group %v with %d ports", port, ig.Name, len(ig.NamedPorts))
|
||||
namedPort := compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port}
|
||||
ig.NamedPorts = append(ig.NamedPorts, &namedPort)
|
||||
|
||||
// setNamedPorts is a zonal endpoint, meaning we invoke it by re-creating a URL like:
|
||||
// {project}/zones/{zone}/instanceGroups/{instanceGroup}/setNamedPorts, so the "zone"
|
||||
// parameter given to SetNamedPorts must not be the entire zone URL.
|
||||
zoneURLParts := strings.Split(ig.Zone, "/")
|
||||
zone := zoneURLParts[len(zoneURLParts)-1]
|
||||
|
||||
// SetNamedPortsOfInstanceGroup sets the list of named ports on a given instance group
|
||||
func (gce *GCECloud) SetNamedPortsOfInstanceGroup(igName, zone string, namedPorts []*compute.NamedPort) error {
|
||||
mc := newInstanceGroupMetricContext("set_namedports", zone)
|
||||
op, err := gce.service.InstanceGroups.SetNamedPorts(
|
||||
gce.projectID, zone, ig.Name,
|
||||
&compute.InstanceGroupsSetNamedPortsRequest{
|
||||
NamedPorts: ig.NamedPorts}).Do()
|
||||
|
||||
gce.projectID, zone, igName,
|
||||
&compute.InstanceGroupsSetNamedPortsRequest{NamedPorts: namedPorts}).Do()
|
||||
if err != nil {
|
||||
mc.Observe(err)
|
||||
return nil, err
|
||||
return mc.Observe(err)
|
||||
}
|
||||
|
||||
if err = gce.waitForZoneOp(op, zone, mc); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &namedPort, nil
|
||||
return gce.waitForZoneOp(op, zone, mc)
|
||||
}
|
||||
|
||||
// GetInstanceGroup returns an instance group by name.
|
||||
|
@ -443,3 +443,117 @@ func (gce *GCECloud) isCurrentInstance(instanceID string) bool {
|
||||
|
||||
return currentInstanceID == canonicalizeInstanceName(instanceID)
|
||||
}
|
||||
|
||||
// ComputeHostTags grabs all tags from all instances being added to the pool.
|
||||
// * The longest tag that is a prefix of the instance name is used
|
||||
// * If any instance has no matching prefix tag, return error
|
||||
// Invoking this method to get host tags is risky since it depends on the format
|
||||
// of the host names in the cluster. Only use it as a fallback if gce.nodeTags
|
||||
// is unspecified
|
||||
func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
|
||||
// TODO: We could store the tags in gceInstance, so we could have already fetched it
|
||||
hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup)
|
||||
nodeInstancePrefix := gce.nodeInstancePrefix
|
||||
for _, host := range hosts {
|
||||
if !strings.HasPrefix(host.Name, gce.nodeInstancePrefix) {
|
||||
glog.Warningf("instance '%s' does not conform to prefix '%s', ignoring filter", host, gce.nodeInstancePrefix)
|
||||
nodeInstancePrefix = ""
|
||||
}
|
||||
|
||||
z, ok := hostNamesByZone[host.Zone]
|
||||
if !ok {
|
||||
z = make(map[string]bool)
|
||||
hostNamesByZone[host.Zone] = z
|
||||
}
|
||||
z[host.Name] = true
|
||||
}
|
||||
|
||||
tags := sets.NewString()
|
||||
|
||||
for zone, hostNames := range hostNamesByZone {
|
||||
pageToken := ""
|
||||
page := 0
|
||||
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
|
||||
listCall := gce.service.Instances.List(gce.projectID, zone)
|
||||
|
||||
if nodeInstancePrefix != "" {
|
||||
// Add the filter for hosts
|
||||
listCall = listCall.Filter("name eq " + nodeInstancePrefix + ".*")
|
||||
}
|
||||
|
||||
// Add the fields we want
|
||||
// TODO(zmerlynn): Internal bug 29524655
|
||||
// listCall = listCall.Fields("items(name,tags)")
|
||||
|
||||
if pageToken != "" {
|
||||
listCall = listCall.PageToken(pageToken)
|
||||
}
|
||||
|
||||
res, err := listCall.Do()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pageToken = res.NextPageToken
|
||||
for _, instance := range res.Items {
|
||||
if !hostNames[instance.Name] {
|
||||
continue
|
||||
}
|
||||
|
||||
longest_tag := ""
|
||||
for _, tag := range instance.Tags.Items {
|
||||
if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
|
||||
longest_tag = tag
|
||||
}
|
||||
}
|
||||
if len(longest_tag) > 0 {
|
||||
tags.Insert(longest_tag)
|
||||
} else {
|
||||
return nil, fmt.Errorf("Could not find any tag that is a prefix of instance name for instance %s", instance.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
if page >= maxPages {
|
||||
glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages)
|
||||
}
|
||||
}
|
||||
if len(tags) == 0 {
|
||||
return nil, fmt.Errorf("No instances found")
|
||||
}
|
||||
return tags.List(), nil
|
||||
}
|
||||
|
||||
// GetNodeTags will first try returning the list of tags specified in GCE cloud Configuration.
|
||||
// If they weren't provided, it'll compute the host tags with the given hostnames. If the list
|
||||
// of hostnames has not changed, a cached set of nodetags are returned.
|
||||
func (gce *GCECloud) GetNodeTags(nodeNames []string) ([]string, error) {
|
||||
// If nodeTags were specified through configuration, use them
|
||||
if len(gce.nodeTags) > 0 {
|
||||
return gce.nodeTags, nil
|
||||
}
|
||||
|
||||
gce.computeNodeTagLock.Lock()
|
||||
defer gce.computeNodeTagLock.Unlock()
|
||||
|
||||
// Early return if hosts have not changed
|
||||
hosts := sets.NewString(nodeNames...)
|
||||
if hosts.Equal(gce.lastKnownNodeNames) {
|
||||
return gce.lastComputedNodeTags, nil
|
||||
}
|
||||
|
||||
// Get GCE instance data by hostname
|
||||
instances, err := gce.getInstancesByNames(nodeNames)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Determine list of host tags
|
||||
tags, err := gce.computeHostTags(instances)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Save the list of tags
|
||||
gce.lastKnownNodeNames = hosts
|
||||
gce.lastComputedNodeTags = tags
|
||||
return tags, nil
|
||||
}
|
||||
|
@ -1010,7 +1010,7 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
|
||||
}
|
||||
|
||||
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
|
||||
mc := newFirewallMetricContext("create", region)
|
||||
mc := newFirewallMetricContext("create")
|
||||
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
|
||||
if err != nil {
|
||||
return mc.Observe(err)
|
||||
@ -1029,7 +1029,7 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets
|
||||
}
|
||||
|
||||
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
|
||||
mc := newFirewallMetricContext("update", region)
|
||||
mc := newFirewallMetricContext("update")
|
||||
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
|
||||
if err != nil {
|
||||
return mc.Observe(err)
|
||||
@ -1083,84 +1083,6 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges nets
|
||||
return firewall, nil
|
||||
}
|
||||
|
||||
// ComputeHostTags grabs all tags from all instances being added to the pool.
|
||||
// * The longest tag that is a prefix of the instance name is used
|
||||
// * If any instance has no matching prefix tag, return error
|
||||
// Invoking this method to get host tags is risky since it depends on the format
|
||||
// of the host names in the cluster. Only use it as a fallback if gce.nodeTags
|
||||
// is unspecified
|
||||
func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
|
||||
// TODO: We could store the tags in gceInstance, so we could have already fetched it
|
||||
hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup)
|
||||
nodeInstancePrefix := gce.nodeInstancePrefix
|
||||
for _, host := range hosts {
|
||||
if !strings.HasPrefix(host.Name, gce.nodeInstancePrefix) {
|
||||
glog.Warningf("instance '%s' does not conform to prefix '%s', ignoring filter", host, gce.nodeInstancePrefix)
|
||||
nodeInstancePrefix = ""
|
||||
}
|
||||
|
||||
z, ok := hostNamesByZone[host.Zone]
|
||||
if !ok {
|
||||
z = make(map[string]bool)
|
||||
hostNamesByZone[host.Zone] = z
|
||||
}
|
||||
z[host.Name] = true
|
||||
}
|
||||
|
||||
tags := sets.NewString()
|
||||
|
||||
for zone, hostNames := range hostNamesByZone {
|
||||
pageToken := ""
|
||||
page := 0
|
||||
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
|
||||
listCall := gce.service.Instances.List(gce.projectID, zone)
|
||||
|
||||
if nodeInstancePrefix != "" {
|
||||
// Add the filter for hosts
|
||||
listCall = listCall.Filter("name eq " + nodeInstancePrefix + ".*")
|
||||
}
|
||||
|
||||
// Add the fields we want
|
||||
// TODO(zmerlynn): Internal bug 29524655
|
||||
// listCall = listCall.Fields("items(name,tags)")
|
||||
|
||||
if pageToken != "" {
|
||||
listCall = listCall.PageToken(pageToken)
|
||||
}
|
||||
|
||||
res, err := listCall.Do()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pageToken = res.NextPageToken
|
||||
for _, instance := range res.Items {
|
||||
if !hostNames[instance.Name] {
|
||||
continue
|
||||
}
|
||||
|
||||
longest_tag := ""
|
||||
for _, tag := range instance.Tags.Items {
|
||||
if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
|
||||
longest_tag = tag
|
||||
}
|
||||
}
|
||||
if len(longest_tag) > 0 {
|
||||
tags.Insert(longest_tag)
|
||||
} else {
|
||||
return nil, fmt.Errorf("Could not find any tag that is a prefix of instance name for instance %s", instance.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
if page >= maxPages {
|
||||
glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages)
|
||||
}
|
||||
}
|
||||
if len(tags) == 0 {
|
||||
return nil, fmt.Errorf("No instances found")
|
||||
}
|
||||
return tags.List(), nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
|
||||
pageToken := ""
|
||||
page := 0
|
||||
@ -1232,7 +1154,7 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
|
||||
}
|
||||
|
||||
func (gce *GCECloud) deleteFirewall(name, region string) error {
|
||||
mc := newFirewallMetricContext("delete", region)
|
||||
mc := newFirewallMetricContext("delete")
|
||||
op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
|
||||
|
||||
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
|
@ -149,16 +149,6 @@ func TestScrubDNS(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateFirewallFails(t *testing.T) {
|
||||
name := "loadbalancer"
|
||||
region := "us-central1"
|
||||
desc := "description"
|
||||
gce := &GCECloud{}
|
||||
if err := gce.createFirewall(name, region, desc, nil, nil, nil); err == nil {
|
||||
t.Errorf("error expected when creating firewall without any tags found")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitProviderID(t *testing.T) {
|
||||
providers := []struct {
|
||||
providerID string
|
||||
|
@ -478,7 +478,7 @@ func (cont *GCEIngressController) deleteURLMap(del bool) (msg string) {
|
||||
|
||||
func (cont *GCEIngressController) deleteBackendService(del bool) (msg string) {
|
||||
gceCloud := cont.Cloud.Provider.(*gcecloud.GCECloud)
|
||||
beList, err := gceCloud.ListBackendServices()
|
||||
beList, err := gceCloud.ListGlobalBackendServices()
|
||||
if err != nil {
|
||||
if cont.isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
return msg
|
||||
@ -495,7 +495,7 @@ func (cont *GCEIngressController) deleteBackendService(del bool) (msg string) {
|
||||
}
|
||||
if del {
|
||||
Logf("Deleting backed-service: %s", be.Name)
|
||||
if err := gceCloud.DeleteBackendService(be.Name); err != nil &&
|
||||
if err := gceCloud.DeleteGlobalBackendService(be.Name); err != nil &&
|
||||
!cont.isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
msg += fmt.Sprintf("Failed to delete backend service %v: %v\n", be.Name, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user