diff --git a/pkg/cloudprovider/providers/gce/BUILD b/pkg/cloudprovider/providers/gce/BUILD index ee79c5d6681..beee30431bb 100644 --- a/pkg/cloudprovider/providers/gce/BUILD +++ b/pkg/cloudprovider/providers/gce/BUILD @@ -41,7 +41,9 @@ go_library( "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/cloudprovider:go_default_library", "//pkg/controller:go_default_library", + "//pkg/master/ports:go_default_library", "//pkg/util/net/sets:go_default_library", + "//pkg/util/version:go_default_library", "//pkg/volume:go_default_library", "//vendor/cloud.google.com/go/compute/metadata:go_default_library", "//vendor/github.com/golang/glog:go_default_library", @@ -70,11 +72,13 @@ go_test( name = "go_default_test", srcs = [ "gce_disks_test.go", + "gce_healthchecks_test.go", "gce_test.go", ], library = ":go_default_library", tags = ["automanaged"], deps = [ + "//pkg/api/v1:go_default_library", "//pkg/cloudprovider:go_default_library", "//vendor/google.golang.org/api/compute/v1:go_default_library", "//vendor/google.golang.org/api/googleapi:go_default_library", diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 06919686b58..a620afcb32b 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -22,6 +22,7 @@ import ( "net/http" "regexp" "strings" + "sync" "time" "cloud.google.com/go/compute/metadata" @@ -80,7 +81,7 @@ type GCECloud struct { serviceBeta *computebeta.Service containerService *container.Service clientBuilder controller.ControllerClientBuilder - ClusterId ClusterId + ClusterID ClusterID projectID string region string localZone string // The zone in which we are running @@ -92,6 +93,11 @@ type GCECloud struct { useMetadataServer bool operationPollRateLimiter flowcontrol.RateLimiter manager ServiceManager + // sharedResourceLock is used to serialize GCE operations that may mutate shared state to + // prevent inconsistencies. For example, load balancers manipulation methods will take the + // lock to prevent shared resources from being prematurely deleted while the operation is + // in progress. + sharedResourceLock sync.Mutex } type ServiceManager interface { @@ -270,10 +276,10 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo } // Initialize takes in a clientBuilder and spawns a goroutine for watching the clusterid configmap. -// This must be called before utilizing the funcs of gce.ClusterId +// This must be called before utilizing the funcs of gce.ClusterID func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) { gce.clientBuilder = clientBuilder - go gce.watchClusterId() + go gce.watchClusterID() } // LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine. diff --git a/pkg/cloudprovider/providers/gce/gce_clusterid.go b/pkg/cloudprovider/providers/gce/gce_clusterid.go index 21e533cb17b..621e927670a 100644 --- a/pkg/cloudprovider/providers/gce/gce_clusterid.go +++ b/pkg/cloudprovider/providers/gce/gce_clusterid.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2017 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -49,18 +49,18 @@ const ( updateFuncFrequency = 10 * time.Minute ) -type ClusterId struct { +type ClusterID struct { idLock sync.RWMutex client clientset.Interface cfgMapKey string store cache.Store - providerId *string - clusterId *string + providerID *string + clusterID *string } -// Continually watches for changes to the cluser id config map -func (gce *GCECloud) watchClusterId() { - gce.ClusterId = ClusterId{ +// Continually watches for changes to the cluster id config map +func (gce *GCECloud) watchClusterID() { + gce.ClusterID = ClusterID{ cfgMapKey: fmt.Sprintf("%v/%v", UIDNamespace, UIDConfigMapName), client: gce.clientBuilder.ClientOrDie("cloud-provider"), } @@ -77,8 +77,8 @@ func (gce *GCECloud) watchClusterId() { return } - glog.V(4).Infof("Observed new configmap for clusterid: %v, %v; setting local values", m.Name, m.Data) - gce.ClusterId.setIds(m) + glog.V(4).Infof("Observed new configmap for clusteriD: %v, %v; setting local values", m.Name, m.Data) + gce.ClusterID.update(m) }, UpdateFunc: func(old, cur interface{}) { m, ok := cur.(*v1.ConfigMap) @@ -96,71 +96,71 @@ func (gce *GCECloud) watchClusterId() { return } - glog.V(4).Infof("Observed updated configmap for clusterid %v, %v; setting local values", m.Name, m.Data) - gce.ClusterId.setIds(m) + glog.V(4).Infof("Observed updated configmap for clusteriD %v, %v; setting local values", m.Name, m.Data) + gce.ClusterID.update(m) }, } - listerWatcher := cache.NewListWatchFromClient(gce.ClusterId.client.Core().RESTClient(), "configmaps", UIDNamespace, fields.Everything()) + listerWatcher := cache.NewListWatchFromClient(gce.ClusterID.client.Core().RESTClient(), "configmaps", UIDNamespace, fields.Everything()) var controller cache.Controller - gce.ClusterId.store, controller = cache.NewInformer(newSingleObjectListerWatcher(listerWatcher, UIDConfigMapName), &v1.ConfigMap{}, updateFuncFrequency, mapEventHandler) + gce.ClusterID.store, controller = cache.NewInformer(newSingleObjectListerWatcher(listerWatcher, UIDConfigMapName), &v1.ConfigMap{}, updateFuncFrequency, mapEventHandler) controller.Run(nil) } -// GetId returns the id which is unique to this cluster +// GetID returns the id which is unique to this cluster // if federated, return the provider id (unique to the cluster) // if not federated, return the cluster id -func (ci *ClusterId) GetId() (string, error) { +func (ci *ClusterID) GetID() (string, error) { if err := ci.getOrInitialize(); err != nil { return "", err } ci.idLock.RLock() defer ci.idLock.RUnlock() - if ci.clusterId == nil { + if ci.clusterID == nil { return "", errors.New("Could not retrieve cluster id") } // If provider ID is set, (Federation is enabled) use this field - if ci.providerId != nil && *ci.providerId != *ci.clusterId { - return *ci.providerId, nil + if ci.providerID != nil { + return *ci.providerID, nil } - // providerId is not set, use the cluster id - return *ci.clusterId, nil + // providerID is not set, use the cluster id + return *ci.clusterID, nil } // GetFederationId returns the id which could represent the entire Federation // or just the cluster if not federated. -func (ci *ClusterId) GetFederationId() (string, bool, error) { +func (ci *ClusterID) GetFederationId() (string, bool, error) { if err := ci.getOrInitialize(); err != nil { return "", false, err } ci.idLock.RLock() defer ci.idLock.RUnlock() - if ci.clusterId == nil { + if ci.clusterID == nil { return "", false, errors.New("Could not retrieve cluster id") } // If provider ID is not set, return false - if ci.providerId == nil || *ci.clusterId == *ci.providerId { + if ci.providerID == nil || *ci.clusterID == *ci.providerID { return "", false, nil } - return *ci.clusterId, true, nil + return *ci.clusterID, true, nil } // getOrInitialize either grabs the configmaps current value or defines the value -// and sets the configmap. This is for the case of the user calling GetClusterId() +// and sets the configmap. This is for the case of the user calling GetClusterID() // before the watch has begun. -func (ci *ClusterId) getOrInitialize() error { +func (ci *ClusterID) getOrInitialize() error { if ci.store == nil { - return errors.New("GCECloud.ClusterId is not ready. Call Initialize() before using.") + return errors.New("GCECloud.ClusterID is not ready. Call Initialize() before using.") } - if ci.clusterId != nil { + if ci.clusterID != nil { return nil } @@ -177,7 +177,7 @@ func (ci *ClusterId) getOrInitialize() error { return err } - glog.V(4).Infof("Creating clusterid: %v", newId) + glog.V(4).Infof("Creating clusteriD: %v", newId) cfg := &v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: UIDConfigMapName, @@ -194,12 +194,12 @@ func (ci *ClusterId) getOrInitialize() error { return err } - glog.V(2).Infof("Created a config map containing clusterid: %v", newId) - ci.setIds(cfg) + glog.V(2).Infof("Created a config map containing clusteriD: %v", newId) + ci.update(cfg) return nil } -func (ci *ClusterId) getConfigMap() (bool, error) { +func (ci *ClusterID) getConfigMap() (bool, error) { item, exists, err := ci.store.GetByKey(ci.cfgMapKey) if err != nil { return false, err @@ -214,18 +214,18 @@ func (ci *ClusterId) getConfigMap() (bool, error) { glog.Error(err) return false, err } - ci.setIds(m) + ci.update(m) return true, nil } -func (ci *ClusterId) setIds(m *v1.ConfigMap) { +func (ci *ClusterID) update(m *v1.ConfigMap) { ci.idLock.Lock() defer ci.idLock.Unlock() - if clusterId, exists := m.Data[UIDCluster]; exists { - ci.clusterId = &clusterId + if clusterID, exists := m.Data[UIDCluster]; exists { + ci.clusterID = &clusterID } if provId, exists := m.Data[UIDProvider]; exists { - ci.providerId = &provId + ci.providerID = &provId } } diff --git a/pkg/cloudprovider/providers/gce/gce_healthchecks.go b/pkg/cloudprovider/providers/gce/gce_healthchecks.go index bf168a029f5..6b0f6795c17 100644 --- a/pkg/cloudprovider/providers/gce/gce_healthchecks.go +++ b/pkg/cloudprovider/providers/gce/gce_healthchecks.go @@ -17,11 +17,23 @@ limitations under the License. package gce import ( + "fmt" "time" + "k8s.io/kubernetes/pkg/api/v1" + "k8s.io/kubernetes/pkg/master/ports" + utilversion "k8s.io/kubernetes/pkg/util/version" + + "github.com/golang/glog" compute "google.golang.org/api/compute/v1" ) +const ( + minNodesHealthCheckVersion = "1.7.0" + nodesHealthCheckPath = "/healthz" + lbNodesHealthCheckPort = ports.ProxyHealthzPort +) + func newHealthcheckMetricContext(request string) *metricContext { return &metricContext{ start: time.Now(), @@ -178,3 +190,59 @@ func (gce *GCECloud) ListHealthChecks() (*compute.HealthCheckList, error) { v, err := gce.service.HealthChecks.List(gce.projectID).Do() return v, mc.Observe(err) } + +// GetNodesHealthCheckPort returns the health check port used by the GCE load +// balancers (l4) for performing health checks on nodes. +func GetNodesHealthCheckPort() int32 { + return lbNodesHealthCheckPort +} + +// getNodesHealthCheckPath returns the health check path used by the GCE load +// balancers (l4) for performing health checks on nodes. +func getNodesHealthCheckPath() string { + return nodesHealthCheckPath +} + +// makeNodesHealthCheckName returns name of the health check resource used by +// the GCE load balancers (l4) for performing health checks on nodes. +func makeNodesHealthCheckName(clusterID string) string { + return fmt.Sprintf("k8s-%v-node", clusterID) +} + +// MakeHealthCheckFirewallName returns the firewall name used by the GCE load +// balancers (l4) for performing health checks. +func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string { + // TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc. + fwName := "k8s-" + hcName + "-http-hc" + if isNodesHealthCheck { + fwName = makeNodesHealthCheckName(clusterID) + "-http-hc" + } + return fwName +} + +// isAtLeastMinNodesHealthCheckVersion checks if a version is higher than +// `minNodesHealthCheckVersion`. +func isAtLeastMinNodesHealthCheckVersion(vstring string) bool { + minVersion, err := utilversion.ParseGeneric(minNodesHealthCheckVersion) + if err != nil { + glog.Errorf("MinNodesHealthCheckVersion (%s) is not a valid version string: %v", minNodesHealthCheckVersion, err) + return false + } + version, err := utilversion.ParseGeneric(vstring) + if err != nil { + glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err) + return false + } + return version.AtLeast(minVersion) +} + +// supportsNodesHealthCheck returns false if anyone of the nodes has version +// lower than `minNodesHealthCheckVersion`. +func supportsNodesHealthCheck(nodes []*v1.Node) bool { + for _, node := range nodes { + if !isAtLeastMinNodesHealthCheckVersion(node.Status.NodeInfo.KubeProxyVersion) { + return false + } + } + return true +} diff --git a/pkg/cloudprovider/providers/gce/gce_healthchecks_test.go b/pkg/cloudprovider/providers/gce/gce_healthchecks_test.go new file mode 100644 index 00000000000..11be6795430 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/gce_healthchecks_test.go @@ -0,0 +1,123 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gce + +import ( + "testing" + + "k8s.io/kubernetes/pkg/api/v1" +) + +func TestIsAtLeastMinNodesHealthCheckVersion(t *testing.T) { + testCases := []struct { + version string + expect bool + }{ + {"v1.7.1", true}, + {"v1.7.0-alpha.2.597+276d289b90d322", true}, + {"v1.6.0-beta.3.472+831q821c907t31a", false}, + {"v1.5.2", false}, + } + + for _, tc := range testCases { + if res := isAtLeastMinNodesHealthCheckVersion(tc.version); res != tc.expect { + t.Errorf("%v: want %v, got %v", tc.version, tc.expect, res) + } + } +} + +func TestSupportsNodesHealthCheck(t *testing.T) { + testCases := []struct { + desc string + nodes []*v1.Node + expect bool + }{ + { + "All nodes support nodes health check", + []*v1.Node{ + { + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KubeProxyVersion: "v1.7.1", + }, + }, + }, + { + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KubeProxyVersion: "v1.7.0-alpha.2.597+276d289b90d322", + }, + }, + }, + }, + true, + }, + { + "All nodes don't support nodes health check", + []*v1.Node{ + { + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KubeProxyVersion: "v1.6.0-beta.3.472+831q821c907t31a", + }, + }, + }, + { + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KubeProxyVersion: "v1.5.2", + }, + }, + }, + }, + false, + }, + { + "One node doesn't support nodes health check", + []*v1.Node{ + { + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KubeProxyVersion: "v1.7.1", + }, + }, + }, + { + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KubeProxyVersion: "v1.7.0-alpha.2.597+276d289b90d322", + }, + }, + }, + { + Status: v1.NodeStatus{ + NodeInfo: v1.NodeSystemInfo{ + KubeProxyVersion: "v1.5.2", + }, + }, + }, + }, + false, + }, + } + + for _, tc := range testCases { + if res := supportsNodesHealthCheck(tc.nodes); res != tc.expect { + t.Errorf("%v: want %v, got %v", tc.desc, tc.expect, res) + } + } +} diff --git a/pkg/cloudprovider/providers/gce/gce_loadbalancer.go b/pkg/cloudprovider/providers/gce/gce_loadbalancer.go index f6aa1cad3f5..dd6a09ef4d5 100644 --- a/pkg/cloudprovider/providers/gce/gce_loadbalancer.go +++ b/pkg/cloudprovider/providers/gce/gce_loadbalancer.go @@ -21,7 +21,6 @@ import ( "fmt" "net" "net/http" - "sort" "strconv" "strings" "time" @@ -140,6 +139,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi } hostNames := nodeNames(nodes) + supportsNodesHealthCheck := supportsNodesHealthCheck(nodes) hosts, err := gce.getInstancesByNames(hostNames) if err != nil { return nil, err @@ -289,13 +289,13 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi // without needing to be deleted and recreated. if firewallExists { glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName) - if err := gce.updateFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil { + if err := gce.updateFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil { return nil, err } glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName) } else { glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName) - if err := gce.createFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil { + if err := gce.createFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil { return nil, err } glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName) @@ -310,34 +310,43 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name) } - // Ensure health checks are created for this target pool to pass to createTargetPool for health check links - // Alternately, if the service has ExternalTrafficPolicy field set from Local to Global, we need to recreate - // the target pool without health checks. This needs to be prior to the forwarding rule deletion below otherwise - // it is not possible to delete just the target pool or http health checks later. - var hcToCreate *compute.HttpHealthCheck - hcExisting, err := gce.GetHttpHealthCheck(loadBalancerName) + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return nil, fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err) + } + // Check which health check needs to create and which health check needs to delete. + // Health check management is coupled with target pool operation to prevent leaking. + var hcToCreate, hcToDelete *compute.HttpHealthCheck + hcLocalTrafficExisting, err := gce.GetHttpHealthCheck(loadBalancerName) if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { - return nil, fmt.Errorf("Error checking HTTP health check %s: %v", loadBalancerName, err) + return nil, fmt.Errorf("error checking HTTP health check %s: %v", loadBalancerName, err) } if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { - glog.V(4).Infof("service %v (%v) needs health checks on :%d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path) - if err != nil { - // This logic exists to detect a transition for a pre-existing service and turn on - // the tpNeedsUpdate flag to delete/recreate fwdrule/tpool adding the health check - // to the target pool. - glog.V(2).Infof("ExternalTrafficPolicy field set to Local on new or pre-existing service") + glog.V(4).Infof("service %v (%v) needs local traffic health checks on: %d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path) + if hcLocalTrafficExisting == nil { + // This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service + // turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the + // target pool to use local traffic health check. + glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName) + if supportsNodesHealthCheck { + hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort()) + } tpNeedsUpdate = true } - hcToCreate, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort) - if err != nil { - return nil, fmt.Errorf("Failed to ensure health check for localized service %v on node port %v: %v", loadBalancerName, healthCheckNodePort, err) - } + hcToCreate = makeHttpHealthCheck(loadBalancerName, path, healthCheckNodePort) } else { - glog.V(4).Infof("service %v does not need health checks", apiService.Name) - if err == nil { - glog.V(2).Infof("Deleting stale health checks for service %v LB %v", apiService.Name, loadBalancerName) + glog.V(4).Infof("Service %v needs nodes health checks.", apiService.Name) + if hcLocalTrafficExisting != nil { + // This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service + // and turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the + // target pool to use nodes health check. + glog.V(2).Infof("Updating from local traffic health checks to nodes health checks for service %v LB %v", apiService.Name, loadBalancerName) + hcToDelete = hcLocalTrafficExisting tpNeedsUpdate = true } + if supportsNodesHealthCheck { + hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort()) + } } // Now we get to some slightly more interesting logic. // First, neither target pools nor forwarding rules can be updated in place - @@ -357,17 +366,12 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName) } if tpExists && tpNeedsUpdate { - // Generate the list of health checks for this target pool to pass to deleteTargetPool - if path, _ := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { - var err error - hcExisting, err = gce.GetHttpHealthCheck(loadBalancerName) - if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { - glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err) - } + // Pass healthchecks to deleteTargetPool to cleanup health checks after cleaning up the target pool itself. + var hcNames []string + if hcToDelete != nil { + hcNames = append(hcNames, hcToDelete.Name) } - - // Pass healthchecks to deleteTargetPool to cleanup health checks prior to cleaning up the target pool itself. - if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcExisting); err != nil { + if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil { return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err) } glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName) @@ -381,11 +385,11 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi createInstances = createInstances[:maxTargetPoolCreateInstances] } // Pass healthchecks to createTargetPool which needs them as health check links in the target pool - if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType, hcToCreate); err != nil { + if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddress, gce.region, createInstances, affinityType, hcToCreate); err != nil { return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err) } if hcToCreate != nil { - glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks for target pool", loadBalancerName, serviceName) + glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks %v for target pool", loadBalancerName, serviceName, hcToCreate.Name) } if len(hosts) <= maxTargetPoolCreateInstances { glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName) @@ -447,18 +451,29 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName, gce.region) - var hc *compute.HttpHealthCheck + var hcNames []string if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" { - var err error - hc, err = gce.GetHttpHealthCheck(loadBalancerName) + hcToDelete, err := gce.GetHttpHealthCheck(loadBalancerName) if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err) return err } + hcNames = append(hcNames, hcToDelete.Name) + } else { + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err) + } + // EnsureLoadBalancerDeleted() could be triggered by changing service from + // LoadBalancer type to others. In this case we have no idea whether it was + // using local traffic health check or nodes health check. Attempt to delete + // both to prevent leaking. + hcNames = append(hcNames, loadBalancerName) + hcNames = append(hcNames, makeNodesHealthCheckName(clusterID)) } errs := utilerrors.AggregateGoroutines( - func() error { return gce.deleteFirewall(loadBalancerName, gce.region) }, + func() error { return gce.deleteFirewall(makeFirewallName(loadBalancerName), gce.region) }, // Even though we don't hold on to static IPs for load balancers, it's // possible that EnsureLoadBalancer left one around in a failed // creation/update attempt, so make sure we clean it up here just in case. @@ -469,7 +484,7 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil { return err } - if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil { + if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil { return err } return nil @@ -509,15 +524,15 @@ func (gce *GCECloud) deleteForwardingRule(name, region string) error { } // DeleteTargetPool deletes the given target pool. -func (gce *GCECloud) DeleteTargetPool(name string, hc *compute.HttpHealthCheck) error { +func (gce *GCECloud) DeleteTargetPool(name string, hcNames ...string) error { region, err := GetGCERegion(gce.localZone) if err != nil { return err } - return gce.deleteTargetPool(name, region, hc) + return gce.deleteTargetPool(name, region, hcNames...) } -func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error { +func (gce *GCECloud) deleteTargetPool(name, region string, hcNames ...string) error { mc := newTargetPoolMetricContext("delete", region) op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do() @@ -535,41 +550,86 @@ func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealt } // Deletion of health checks is allowed only after the TargetPool reference is deleted - if hc != nil { - glog.Infof("Deleting health check %v", hc.Name) - if err := gce.DeleteHttpHealthCheck(hc.Name); err != nil { - glog.Warningf("Failed to delete health check %v: %v", hc, err) + for _, hcName := range hcNames { + if err = func() error { + // Check whether it is nodes health check, which has different name from the load-balancer. + isNodesHealthCheck := hcName != name + if isNodesHealthCheck { + // Lock to prevent deleting necessary nodes health check before it gets attached + // to target pool. + gce.sharedResourceLock.Lock() + defer gce.sharedResourceLock.Unlock() + } + glog.Infof("Deleting health check %v", hcName) + if err := gce.DeleteHttpHealthCheck(hcName); err != nil { + // Delete nodes health checks will fail if any other target pool is using it. + if isInUsedByError(err) { + glog.V(4).Infof("Health check %v is in used: %v.", hcName, err) + return nil + } else if !isHTTPErrorCode(err, http.StatusNotFound) { + glog.Warningf("Failed to delete health check %v: %v", hcName, err) + return err + } + // StatusNotFound could happen when: + // - This is the first attempt but we pass in a healthcheck that is already deleted + // to prevent leaking. + // - This is the first attempt but user manually deleted the heathcheck. + // - This is a retry and in previous round we failed to delete the healthcheck firewall + // after deleted the healthcheck. + // We continue to delete the healthcheck firewall to prevent leaking. + glog.V(4).Infof("Health check %v is already deleted.", hcName) + } + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return fmt.Errorf("error getting cluster ID: %v", err) + } + // If health check is deleted without error, it means no load-balancer is using it. + // So we should delete the health check firewall as well. + fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck) + glog.Infof("Deleting firewall %v.", fwName) + if err := gce.DeleteFirewall(fwName); err != nil { + if isHTTPErrorCode(err, http.StatusNotFound) { + glog.V(4).Infof("Firewall %v is already deleted.", fwName) + return nil + } + return err + } + return nil + }(); err != nil { return err } - } else { - // This is a HC cleanup attempt to prevent stale HCs when errors are encountered - // during HC deletion in a prior pass through EnsureLoadBalancer. - // The HC name matches the load balancer name - normally this is expected to fail. - if err := gce.DeleteHttpHealthCheck(name); err == nil { - // We only print a warning if this deletion actually succeeded (which - // means there was indeed a stale health check with the LB name. - glog.Warningf("Deleted stale http health check for LB: %s", name) - } } + return nil } -func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error { - var instances []string - for _, host := range hosts { - instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) - } +func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error { // health check management is coupled with targetPools to prevent leaks. A // target pool is the only thing that requires a health check, so we delete // associated checks on teardown, and ensure checks on setup. hcLinks := []string{} if hc != nil { + // Check whether it is nodes health check, which has different name from the load-balancer. + isNodesHealthCheck := hc.Name != name + if isNodesHealthCheck { + // Lock to prevent necessary nodes health check / firewall gets deleted. + gce.sharedResourceLock.Lock() + defer gce.sharedResourceLock.Unlock() + } + if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, gce.region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil { + return err + } var err error - if hc, err = gce.ensureHttpHealthCheck(name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil { + if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil { return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err) } hcLinks = append(hcLinks, hc.SelfLink) } + + var instances []string + for _, host := range hosts { + instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name)) + } glog.Infof("Creating targetpool %v with %d healthchecks", name, len(hcLinks)) pool := &compute.TargetPool{ Name: name, @@ -651,8 +711,8 @@ func (gce *GCECloud) targetPoolURL(name, region string) string { return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) } -func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) { - newHC := &compute.HttpHealthCheck{ +func makeHttpHealthCheck(name, path string, port int32) *compute.HttpHealthCheck { + return &compute.HttpHealthCheck{ Name: name, Port: int64(port), RequestPath: path, @@ -663,7 +723,10 @@ func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *c HealthyThreshold: gceHcHealthyThreshold, UnhealthyThreshold: gceHcUnhealthyThreshold, } +} +func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) { + newHC := makeHttpHealthCheck(name, path, port) hc, err = gce.GetHttpHealthCheck(name) if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) { glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path) @@ -843,7 +906,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st if isHTTPErrorCode(err, http.StatusNotFound) { return false, true, nil } - return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err) + return false, false, fmt.Errorf("error getting load balancer's firewall: %v", err) } if fw.Description != makeFirewallDescription(serviceName, ipAddress) { return true, true, nil @@ -856,7 +919,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st for ix := range ports { allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port)) } - if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) { + if !equalStringSets(allowedPorts, fw.Allowed[0].Ports) { return true, true, nil } // The service controller already verified that the protocol matches on all ports, no need to check. @@ -875,18 +938,47 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st return true, false, nil } -func slicesEqual(x, y []string) bool { - if len(x) != len(y) { - return false +func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, region string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error { + clusterID, err := gce.ClusterID.GetID() + if err != nil { + return fmt.Errorf("error getting cluster ID: %v", err) } - sort.Strings(x) - sort.Strings(y) - for i := range x { - if x[i] != y[i] { - return false + + // Prepare the firewall params for creating / checking. + desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID) + if !isNodesHealthCheck { + desc = makeFirewallDescription(serviceName, ipAddress) + } + sourceRanges := lbSrcRngsFlag.ipn + ports := []v1.ServicePort{{Protocol: "tcp", Port: hcPort}} + + fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck) + fw, err := gce.service.Firewalls.Get(gce.projectID, fwName).Do() + if err != nil { + if !isHTTPErrorCode(err, http.StatusNotFound) { + return fmt.Errorf("error getting firewall for health checks: %v", err) } + glog.Infof("Creating firewall %v for health checks.", fwName) + if err := gce.createFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil { + return err + } + glog.Infof("Created firewall %v for health checks.", fwName) + return nil } - return true + // Validate firewall fields. + if fw.Description != desc || + len(fw.Allowed) != 1 || + fw.Allowed[0].IPProtocol != string(ports[0].Protocol) || + !equalStringSets(fw.Allowed[0].Ports, []string{string(ports[0].Port)}) || + !equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) { + glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName) + if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil { + glog.Warningf("Failed to reconcile firewall %v parameters.", fwName) + return err + } + glog.V(4).Infof("Corrected firewall %v parameters successful", fwName) + } + return nil } func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []v1.ServicePort) error { @@ -942,7 +1034,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges nets if err != nil { return mc.Observe(err) } - op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do() + op, err := gce.service.Firewalls.Update(gce.projectID, name, firewall).Do() if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { return mc.Observe(err) } @@ -971,7 +1063,7 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges nets } firewall := &compute.Firewall{ - Name: makeFirewallName(name), + Name: name, Description: desc, Network: gce.networkURL, SourceRanges: sourceRanges.StringSlice(), @@ -1141,17 +1233,16 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string func (gce *GCECloud) deleteFirewall(name, region string) error { mc := newFirewallMetricContext("delete", region) - fwName := makeFirewallName(name) - op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do() + op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do() if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name) } else if err != nil { - glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err) + glog.Warningf("Failed to delete firewall %s, got error %v", name, err) return mc.Observe(err) } else { if err := gce.waitForGlobalOp(op, mc); err != nil { - glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err) + glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", name, err) return err } } diff --git a/pkg/cloudprovider/providers/gce/gce_util.go b/pkg/cloudprovider/providers/gce/gce_util.go index 8f59cd5c495..90b51759f22 100644 --- a/pkg/cloudprovider/providers/gce/gce_util.go +++ b/pkg/cloudprovider/providers/gce/gce_util.go @@ -19,10 +19,12 @@ package gce import ( "errors" "fmt" + "net/http" "regexp" "strings" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "cloud.google.com/go/compute/metadata" compute "google.golang.org/api/compute/v1" @@ -105,6 +107,14 @@ func isHTTPErrorCode(err error, code int) bool { return ok && apiErr.Code == code } +func isInUsedByError(err error) bool { + apiErr, ok := err.(*googleapi.Error) + if !ok || apiErr.Code != http.StatusBadRequest { + return false + } + return strings.Contains(apiErr.Message, "being used by") +} + // splitProviderID splits a provider's id into core components. // A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}' // See cloudprovider.GetInstanceProviderID. @@ -115,3 +125,12 @@ func splitProviderID(providerID string) (project, zone, instance string, err err } return matches[1], matches[2], matches[3], nil } + +func equalStringSets(x, y []string) bool { + if len(x) != len(y) { + return false + } + xString := sets.NewString(x...) + yString := sets.NewString(y...) + return xString.Equal(yString) +} diff --git a/test/e2e/firewall.go b/test/e2e/firewall.go index d6640e18084..0030dfa57a5 100644 --- a/test/e2e/firewall.go +++ b/test/e2e/firewall.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + "k8s.io/kubernetes/pkg/cloudprovider" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/test/e2e/framework" @@ -45,13 +46,18 @@ var _ = framework.KubeDescribe("Firewall rule", func() { gceCloud = cloudConfig.Provider.(*gcecloud.GCECloud) }) - // This test takes around 4 minutes to run + // This test takes around 6 minutes to run It("[Slow] [Serial] should create valid firewall rules for LoadBalancer type service", func() { ns := f.Namespace.Name // This source ranges is just used to examine we have exact same things on LB firewall rules firewallTestSourceRanges := []string{"0.0.0.0/1", "128.0.0.0/1"} serviceName := "firewall-test-loadbalancer" + By("Getting cluster ID") + clusterID, err := framework.GetClusterID(cs) + Expect(err).NotTo(HaveOccurred()) + framework.Logf("Got cluster ID: %v", clusterID) + jig := framework.NewServiceTestJig(cs, serviceName) nodesNames := jig.GetNodesNames(framework.MaxNodesForEndpointsTests) if len(nodesNames) <= 0 { @@ -59,28 +65,52 @@ var _ = framework.KubeDescribe("Firewall rule", func() { } nodesSet := sets.NewString(nodesNames...) - // OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE - By("Creating a LoadBalancer type service with ExternalTrafficPolicy=Local") - svc := jig.CreateOnlyLocalLoadBalancerService(ns, serviceName, - framework.LoadBalancerCreateTimeoutDefault, false, func(svc *v1.Service) { - svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: framework.FirewallTestHttpPort}} - svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges - }) + By("Creating a LoadBalancer type service with ExternalTrafficPolicy=Global") + svc := jig.CreateLoadBalancerService(ns, serviceName, framework.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) { + svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: framework.FirewallTestHttpPort}} + svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges + }) defer func() { jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeNodePort svc.Spec.LoadBalancerSourceRanges = nil }) Expect(cs.Core().Services(svc.Namespace).Delete(svc.Name, nil)).NotTo(HaveOccurred()) + By("Waiting for the local traffic health check firewall rule to be deleted") + localHCFwName := framework.MakeHealthCheckFirewallNameForLBService(clusterID, cloudprovider.GetLoadBalancerName(svc), false) + _, err := framework.WaitForFirewallRule(gceCloud, localHCFwName, false, framework.LoadBalancerCleanupTimeout) + Expect(err).NotTo(HaveOccurred()) }() svcExternalIP := svc.Status.LoadBalancer.Ingress[0].IP - By("Checking if service's firewall rules are correct") + By("Checking if service's firewall rule is correct") nodeTags := framework.GetInstanceTags(cloudConfig, nodesNames[0]) - expFw := framework.ConstructFirewallForLBService(svc, nodeTags.Items) - fw, err := gceCloud.GetFirewall(expFw.Name) + lbFw := framework.ConstructFirewallForLBService(svc, nodeTags.Items) + fw, err := gceCloud.GetFirewall(lbFw.Name) Expect(err).NotTo(HaveOccurred()) - Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) + Expect(framework.VerifyFirewallRule(fw, lbFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) + + By("Checking if service's nodes health check firewall rule is correct") + nodesHCFw := framework.ConstructHealthCheckFirewallForLBService(clusterID, svc, nodeTags.Items, true) + fw, err = gceCloud.GetFirewall(nodesHCFw.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(framework.VerifyFirewallRule(fw, nodesHCFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) + + // OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE + By("Updating LoadBalancer service to ExternalTrafficPolicy=Local") + svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }) + + By("Waiting for the nodes health check firewall rule to be deleted") + _, err = framework.WaitForFirewallRule(gceCloud, nodesHCFw.Name, false, framework.LoadBalancerCleanupTimeout) + Expect(err).NotTo(HaveOccurred()) + + By("Waiting for the correct local traffic health check firewall rule to be created") + localHCFw := framework.ConstructHealthCheckFirewallForLBService(clusterID, svc, nodeTags.Items, false) + fw, err = framework.WaitForFirewallRule(gceCloud, localHCFw.Name, true, framework.LoadBalancerCreateTimeoutDefault) + Expect(err).NotTo(HaveOccurred()) + Expect(framework.VerifyFirewallRule(fw, localHCFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) By(fmt.Sprintf("Creating netexec pods on at most %v nodes", framework.MaxNodesForEndpointsTests)) for i, nodeName := range nodesNames { @@ -100,7 +130,7 @@ var _ = framework.KubeDescribe("Firewall rule", func() { // by removing the tag on one vm and make sure it doesn't get any traffic. This is an imperfect // simulation, we really want to check that traffic doesn't reach a vm outside the GKE cluster, but // that's much harder to do in the current e2e framework. - By("Removing tags from one of the nodes") + By(fmt.Sprintf("Removing tags from one of the nodes: %v", nodesNames[0])) nodesSet.Delete(nodesNames[0]) removedTags := framework.SetInstanceTags(cloudConfig, nodesNames[0], []string{}) defer func() { diff --git a/test/e2e/framework/BUILD b/test/e2e/framework/BUILD index 530287e91ab..8d96e4ea293 100644 --- a/test/e2e/framework/BUILD +++ b/test/e2e/framework/BUILD @@ -45,6 +45,7 @@ go_library( "//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/node:go_default_library", "//pkg/api/v1/pod:go_default_library", + "//pkg/api/v1/service:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/authorization/v1beta1:go_default_library", "//pkg/apis/batch:go_default_library", diff --git a/test/e2e/framework/firewall_util.go b/test/e2e/framework/firewall_util.go index fe6ede0a57d..2de5860777a 100644 --- a/test/e2e/framework/firewall_util.go +++ b/test/e2e/framework/firewall_util.go @@ -18,12 +18,16 @@ package framework import ( "fmt" + "net/http" "strconv" "strings" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/api/v1" + apiservice "k8s.io/kubernetes/pkg/api/v1/service" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/cloudprovider" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" @@ -41,7 +45,7 @@ const ( ) // MakeFirewallNameForLBService return the expected firewall name for a LB service. -// This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce.go +// This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce_loadbalancer.go func MakeFirewallNameForLBService(name string) string { return fmt.Sprintf("k8s-fw-%s", name) } @@ -68,6 +72,32 @@ func ConstructFirewallForLBService(svc *v1.Service, nodesTags []string) *compute return &fw } +func MakeHealthCheckFirewallNameForLBService(clusterID, name string, isNodesHealthCheck bool) string { + return gcecloud.MakeHealthCheckFirewallName(clusterID, name, isNodesHealthCheck) +} + +// ConstructHealthCheckFirewallForLBService returns the expected GCE firewall rule for a loadbalancer type service +func ConstructHealthCheckFirewallForLBService(clusterID string, svc *v1.Service, nodesTags []string, isNodesHealthCheck bool) *compute.Firewall { + if svc.Spec.Type != v1.ServiceTypeLoadBalancer { + Failf("can not construct firewall rule for non-loadbalancer type service") + } + fw := compute.Firewall{} + fw.Name = MakeHealthCheckFirewallNameForLBService(clusterID, cloudprovider.GetLoadBalancerName(svc), isNodesHealthCheck) + fw.TargetTags = nodesTags + fw.SourceRanges = gcecloud.LoadBalancerSrcRanges() + healthCheckPort := gcecloud.GetNodesHealthCheckPort() + if !isNodesHealthCheck { + healthCheckPort = apiservice.GetServiceHealthCheckNodePort(svc) + } + fw.Allowed = []*compute.FirewallAllowed{ + { + IPProtocol: "tcp", + Ports: []string{fmt.Sprintf("%d", healthCheckPort)}, + }, + } + return &fw +} + // GetNodeTags gets tags from one of the Kubernetes nodes func GetNodeTags(c clientset.Interface, cloudConfig CloudConfig) *compute.Tags { nodes := GetReadySchedulableNodesOrDie(c) @@ -303,6 +333,9 @@ func SameStringArray(result, expected []string, include bool) error { // VerifyFirewallRule verifies whether the result firewall is consistent with the expected firewall. // When `portsSubset` is false, match given ports exactly. Otherwise, only check ports are included. func VerifyFirewallRule(res, exp *compute.Firewall, network string, portsSubset bool) error { + if res == nil || exp == nil { + return fmt.Errorf("res and exp must not be nil") + } if res.Name != exp.Name { return fmt.Errorf("incorrect name: %v, expected %v", res.Name, exp.Name) } @@ -325,3 +358,40 @@ func VerifyFirewallRule(res, exp *compute.Firewall, network string, portsSubset } return nil } + +func WaitForFirewallRule(gceCloud *gcecloud.GCECloud, fwName string, exist bool, timeout time.Duration) (*compute.Firewall, error) { + Logf("Waiting up to %v for firewall %v exist=%v", timeout, fwName, exist) + var fw *compute.Firewall + var err error + + condition := func() (bool, error) { + fw, err = gceCloud.GetFirewall(fwName) + if err != nil && exist || + err == nil && !exist || + err != nil && !exist && !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) { + return false, nil + } + return true, nil + } + + if err := wait.PollImmediate(5*time.Second, timeout, condition); err != nil { + return nil, fmt.Errorf("error waiting for firewall %v exist=%v", fwName, exist) + } + return fw, nil +} + +func GetClusterID(c clientset.Interface) (string, error) { + cm, err := c.Core().ConfigMaps(metav1.NamespaceSystem).Get(gcecloud.UIDConfigMapName, metav1.GetOptions{}) + if err != nil || cm == nil { + return "", fmt.Errorf("error getting cluster ID: %v", err) + } + clusterID, clusterIDExists := cm.Data[gcecloud.UIDCluster] + providerID, providerIDExists := cm.Data[gcecloud.UIDProvider] + if !clusterIDExists { + return "", fmt.Errorf("cluster ID not set") + } + if providerIDExists { + return providerID, nil + } + return clusterID, nil +} diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index e91cff78b07..238b7ae1e12 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -235,6 +235,25 @@ func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceNa return svc } +// CreateLoadBalancerService creates a loadbalancer service and waits +// for it to acquire an ingress IP. +func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service { + By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer") + svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) { + svc.Spec.Type = v1.ServiceTypeLoadBalancer + // We need to turn affinity off for our LB distribution tests + svc.Spec.SessionAffinity = v1.ServiceAffinityNone + if tweak != nil { + tweak(svc) + } + }) + + By("waiting for loadbalancer for service " + namespace + "/" + serviceName) + svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout) + j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer) + return svc +} + func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) { for j := range node.Status.Addresses { nodeAddress := &node.Status.Addresses[j] diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 32b1ebca95d..0f31c013ed9 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -5181,12 +5181,16 @@ func CleanupGCEResources(loadBalancerName string) (retErr error) { if err := DeleteGCEStaticIP(loadBalancerName); err != nil { Logf("%v", err) } + var hcNames []string hc, getErr := gceCloud.GetHttpHealthCheck(loadBalancerName) if getErr != nil && !IsGoogleAPIHTTPErrorCode(getErr, http.StatusNotFound) { retErr = fmt.Errorf("%v\n%v", retErr, getErr) return } - if err := gceCloud.DeleteTargetPool(loadBalancerName, hc); err != nil && + if hc != nil { + hcNames = append(hcNames, hc.Name) + } + if err := gceCloud.DeleteTargetPool(loadBalancerName, hcNames...); err != nil && !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) { retErr = fmt.Errorf("%v\n%v", retErr, err) }