Merge pull request #45524 from MrHohn/l4-lb-healthcheck

Automatic merge from submit-queue (batch tested with PRs 46252, 45524, 46236, 46277, 46522)

Make GCE load-balancers create health checks for nodes

From #14661. Proposal on kubernetes/community#552. Fixes #46313.

Bullet points:
- Create nodes health check and firewall (for health checking) for non-OnlyLocal service.
- Create local traffic health check and firewall (for health checking) for OnlyLocal service.
- Version skew: 
   - Don't create nodes health check if any nodes has version < 1.7.0.
   - Don't backfill nodes health check on existing LBs unless users explicitly trigger it.

**Release note**:

```release-note
GCE Cloud Provider: New created LoadBalancer type Service now have health checks for nodes by default.
An existing LoadBalancer will have health check attached to it when:
- Change Service.Spec.Type from LoadBalancer to others and flip it back.
- Any effective change on Service.Spec.ExternalTrafficPolicy.
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-26 19:47:57 -07:00 committed by GitHub
commit daee6d4826
12 changed files with 571 additions and 136 deletions

View File

@ -41,7 +41,9 @@ go_library(
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/util/net/sets:go_default_library", "//pkg/util/net/sets:go_default_library",
"//pkg/util/version:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//vendor/cloud.google.com/go/compute/metadata:go_default_library", "//vendor/cloud.google.com/go/compute/metadata:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
@ -70,11 +72,13 @@ go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"gce_disks_test.go", "gce_disks_test.go",
"gce_healthchecks_test.go",
"gce_test.go", "gce_test.go",
], ],
library = ":go_default_library", library = ":go_default_library",
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api/v1:go_default_library",
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library", "//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/google.golang.org/api/googleapi:go_default_library", "//vendor/google.golang.org/api/googleapi:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"net/http" "net/http"
"regexp" "regexp"
"strings" "strings"
"sync"
"time" "time"
"cloud.google.com/go/compute/metadata" "cloud.google.com/go/compute/metadata"
@ -80,7 +81,7 @@ type GCECloud struct {
serviceBeta *computebeta.Service serviceBeta *computebeta.Service
containerService *container.Service containerService *container.Service
clientBuilder controller.ControllerClientBuilder clientBuilder controller.ControllerClientBuilder
ClusterId ClusterId ClusterID ClusterID
projectID string projectID string
region string region string
localZone string // The zone in which we are running localZone string // The zone in which we are running
@ -92,6 +93,11 @@ type GCECloud struct {
useMetadataServer bool useMetadataServer bool
operationPollRateLimiter flowcontrol.RateLimiter operationPollRateLimiter flowcontrol.RateLimiter
manager ServiceManager manager ServiceManager
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
// prevent inconsistencies. For example, load balancers manipulation methods will take the
// lock to prevent shared resources from being prematurely deleted while the operation is
// in progress.
sharedResourceLock sync.Mutex
} }
type ServiceManager interface { type ServiceManager interface {
@ -270,10 +276,10 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
} }
// Initialize takes in a clientBuilder and spawns a goroutine for watching the clusterid configmap. // Initialize takes in a clientBuilder and spawns a goroutine for watching the clusterid configmap.
// This must be called before utilizing the funcs of gce.ClusterId // This must be called before utilizing the funcs of gce.ClusterID
func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) { func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) {
gce.clientBuilder = clientBuilder gce.clientBuilder = clientBuilder
go gce.watchClusterId() go gce.watchClusterID()
} }
// LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine. // LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine.

View File

@ -1,5 +1,5 @@
/* /*
Copyright 2014 The Kubernetes Authors. Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
@ -49,18 +49,18 @@ const (
updateFuncFrequency = 10 * time.Minute updateFuncFrequency = 10 * time.Minute
) )
type ClusterId struct { type ClusterID struct {
idLock sync.RWMutex idLock sync.RWMutex
client clientset.Interface client clientset.Interface
cfgMapKey string cfgMapKey string
store cache.Store store cache.Store
providerId *string providerID *string
clusterId *string clusterID *string
} }
// Continually watches for changes to the cluser id config map // Continually watches for changes to the cluster id config map
func (gce *GCECloud) watchClusterId() { func (gce *GCECloud) watchClusterID() {
gce.ClusterId = ClusterId{ gce.ClusterID = ClusterID{
cfgMapKey: fmt.Sprintf("%v/%v", UIDNamespace, UIDConfigMapName), cfgMapKey: fmt.Sprintf("%v/%v", UIDNamespace, UIDConfigMapName),
client: gce.clientBuilder.ClientOrDie("cloud-provider"), client: gce.clientBuilder.ClientOrDie("cloud-provider"),
} }
@ -77,8 +77,8 @@ func (gce *GCECloud) watchClusterId() {
return return
} }
glog.V(4).Infof("Observed new configmap for clusterid: %v, %v; setting local values", m.Name, m.Data) glog.V(4).Infof("Observed new configmap for clusteriD: %v, %v; setting local values", m.Name, m.Data)
gce.ClusterId.setIds(m) gce.ClusterID.update(m)
}, },
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
m, ok := cur.(*v1.ConfigMap) m, ok := cur.(*v1.ConfigMap)
@ -96,71 +96,71 @@ func (gce *GCECloud) watchClusterId() {
return return
} }
glog.V(4).Infof("Observed updated configmap for clusterid %v, %v; setting local values", m.Name, m.Data) glog.V(4).Infof("Observed updated configmap for clusteriD %v, %v; setting local values", m.Name, m.Data)
gce.ClusterId.setIds(m) gce.ClusterID.update(m)
}, },
} }
listerWatcher := cache.NewListWatchFromClient(gce.ClusterId.client.Core().RESTClient(), "configmaps", UIDNamespace, fields.Everything()) listerWatcher := cache.NewListWatchFromClient(gce.ClusterID.client.Core().RESTClient(), "configmaps", UIDNamespace, fields.Everything())
var controller cache.Controller var controller cache.Controller
gce.ClusterId.store, controller = cache.NewInformer(newSingleObjectListerWatcher(listerWatcher, UIDConfigMapName), &v1.ConfigMap{}, updateFuncFrequency, mapEventHandler) gce.ClusterID.store, controller = cache.NewInformer(newSingleObjectListerWatcher(listerWatcher, UIDConfigMapName), &v1.ConfigMap{}, updateFuncFrequency, mapEventHandler)
controller.Run(nil) controller.Run(nil)
} }
// GetId returns the id which is unique to this cluster // GetID returns the id which is unique to this cluster
// if federated, return the provider id (unique to the cluster) // if federated, return the provider id (unique to the cluster)
// if not federated, return the cluster id // if not federated, return the cluster id
func (ci *ClusterId) GetId() (string, error) { func (ci *ClusterID) GetID() (string, error) {
if err := ci.getOrInitialize(); err != nil { if err := ci.getOrInitialize(); err != nil {
return "", err return "", err
} }
ci.idLock.RLock() ci.idLock.RLock()
defer ci.idLock.RUnlock() defer ci.idLock.RUnlock()
if ci.clusterId == nil { if ci.clusterID == nil {
return "", errors.New("Could not retrieve cluster id") return "", errors.New("Could not retrieve cluster id")
} }
// If provider ID is set, (Federation is enabled) use this field // If provider ID is set, (Federation is enabled) use this field
if ci.providerId != nil && *ci.providerId != *ci.clusterId { if ci.providerID != nil {
return *ci.providerId, nil return *ci.providerID, nil
} }
// providerId is not set, use the cluster id // providerID is not set, use the cluster id
return *ci.clusterId, nil return *ci.clusterID, nil
} }
// GetFederationId returns the id which could represent the entire Federation // GetFederationId returns the id which could represent the entire Federation
// or just the cluster if not federated. // or just the cluster if not federated.
func (ci *ClusterId) GetFederationId() (string, bool, error) { func (ci *ClusterID) GetFederationId() (string, bool, error) {
if err := ci.getOrInitialize(); err != nil { if err := ci.getOrInitialize(); err != nil {
return "", false, err return "", false, err
} }
ci.idLock.RLock() ci.idLock.RLock()
defer ci.idLock.RUnlock() defer ci.idLock.RUnlock()
if ci.clusterId == nil { if ci.clusterID == nil {
return "", false, errors.New("Could not retrieve cluster id") return "", false, errors.New("Could not retrieve cluster id")
} }
// If provider ID is not set, return false // If provider ID is not set, return false
if ci.providerId == nil || *ci.clusterId == *ci.providerId { if ci.providerID == nil || *ci.clusterID == *ci.providerID {
return "", false, nil return "", false, nil
} }
return *ci.clusterId, true, nil return *ci.clusterID, true, nil
} }
// getOrInitialize either grabs the configmaps current value or defines the value // getOrInitialize either grabs the configmaps current value or defines the value
// and sets the configmap. This is for the case of the user calling GetClusterId() // and sets the configmap. This is for the case of the user calling GetClusterID()
// before the watch has begun. // before the watch has begun.
func (ci *ClusterId) getOrInitialize() error { func (ci *ClusterID) getOrInitialize() error {
if ci.store == nil { if ci.store == nil {
return errors.New("GCECloud.ClusterId is not ready. Call Initialize() before using.") return errors.New("GCECloud.ClusterID is not ready. Call Initialize() before using.")
} }
if ci.clusterId != nil { if ci.clusterID != nil {
return nil return nil
} }
@ -177,7 +177,7 @@ func (ci *ClusterId) getOrInitialize() error {
return err return err
} }
glog.V(4).Infof("Creating clusterid: %v", newId) glog.V(4).Infof("Creating clusteriD: %v", newId)
cfg := &v1.ConfigMap{ cfg := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: UIDConfigMapName, Name: UIDConfigMapName,
@ -194,12 +194,12 @@ func (ci *ClusterId) getOrInitialize() error {
return err return err
} }
glog.V(2).Infof("Created a config map containing clusterid: %v", newId) glog.V(2).Infof("Created a config map containing clusteriD: %v", newId)
ci.setIds(cfg) ci.update(cfg)
return nil return nil
} }
func (ci *ClusterId) getConfigMap() (bool, error) { func (ci *ClusterID) getConfigMap() (bool, error) {
item, exists, err := ci.store.GetByKey(ci.cfgMapKey) item, exists, err := ci.store.GetByKey(ci.cfgMapKey)
if err != nil { if err != nil {
return false, err return false, err
@ -214,18 +214,18 @@ func (ci *ClusterId) getConfigMap() (bool, error) {
glog.Error(err) glog.Error(err)
return false, err return false, err
} }
ci.setIds(m) ci.update(m)
return true, nil return true, nil
} }
func (ci *ClusterId) setIds(m *v1.ConfigMap) { func (ci *ClusterID) update(m *v1.ConfigMap) {
ci.idLock.Lock() ci.idLock.Lock()
defer ci.idLock.Unlock() defer ci.idLock.Unlock()
if clusterId, exists := m.Data[UIDCluster]; exists { if clusterID, exists := m.Data[UIDCluster]; exists {
ci.clusterId = &clusterId ci.clusterID = &clusterID
} }
if provId, exists := m.Data[UIDProvider]; exists { if provId, exists := m.Data[UIDProvider]; exists {
ci.providerId = &provId ci.providerID = &provId
} }
} }

View File

@ -17,11 +17,23 @@ limitations under the License.
package gce package gce
import ( import (
"fmt"
"time" "time"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/master/ports"
utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
) )
const (
minNodesHealthCheckVersion = "1.7.0"
nodesHealthCheckPath = "/healthz"
lbNodesHealthCheckPort = ports.ProxyHealthzPort
)
func newHealthcheckMetricContext(request string) *metricContext { func newHealthcheckMetricContext(request string) *metricContext {
return &metricContext{ return &metricContext{
start: time.Now(), start: time.Now(),
@ -178,3 +190,59 @@ func (gce *GCECloud) ListHealthChecks() (*compute.HealthCheckList, error) {
v, err := gce.service.HealthChecks.List(gce.projectID).Do() v, err := gce.service.HealthChecks.List(gce.projectID).Do()
return v, mc.Observe(err) return v, mc.Observe(err)
} }
// GetNodesHealthCheckPort returns the health check port used by the GCE load
// balancers (l4) for performing health checks on nodes.
func GetNodesHealthCheckPort() int32 {
return lbNodesHealthCheckPort
}
// getNodesHealthCheckPath returns the health check path used by the GCE load
// balancers (l4) for performing health checks on nodes.
func getNodesHealthCheckPath() string {
return nodesHealthCheckPath
}
// makeNodesHealthCheckName returns name of the health check resource used by
// the GCE load balancers (l4) for performing health checks on nodes.
func makeNodesHealthCheckName(clusterID string) string {
return fmt.Sprintf("k8s-%v-node", clusterID)
}
// MakeHealthCheckFirewallName returns the firewall name used by the GCE load
// balancers (l4) for performing health checks.
func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string {
// TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc.
fwName := "k8s-" + hcName + "-http-hc"
if isNodesHealthCheck {
fwName = makeNodesHealthCheckName(clusterID) + "-http-hc"
}
return fwName
}
// isAtLeastMinNodesHealthCheckVersion checks if a version is higher than
// `minNodesHealthCheckVersion`.
func isAtLeastMinNodesHealthCheckVersion(vstring string) bool {
minVersion, err := utilversion.ParseGeneric(minNodesHealthCheckVersion)
if err != nil {
glog.Errorf("MinNodesHealthCheckVersion (%s) is not a valid version string: %v", minNodesHealthCheckVersion, err)
return false
}
version, err := utilversion.ParseGeneric(vstring)
if err != nil {
glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err)
return false
}
return version.AtLeast(minVersion)
}
// supportsNodesHealthCheck returns false if anyone of the nodes has version
// lower than `minNodesHealthCheckVersion`.
func supportsNodesHealthCheck(nodes []*v1.Node) bool {
for _, node := range nodes {
if !isAtLeastMinNodesHealthCheckVersion(node.Status.NodeInfo.KubeProxyVersion) {
return false
}
}
return true
}

View File

@ -0,0 +1,123 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"testing"
"k8s.io/kubernetes/pkg/api/v1"
)
func TestIsAtLeastMinNodesHealthCheckVersion(t *testing.T) {
testCases := []struct {
version string
expect bool
}{
{"v1.7.1", true},
{"v1.7.0-alpha.2.597+276d289b90d322", true},
{"v1.6.0-beta.3.472+831q821c907t31a", false},
{"v1.5.2", false},
}
for _, tc := range testCases {
if res := isAtLeastMinNodesHealthCheckVersion(tc.version); res != tc.expect {
t.Errorf("%v: want %v, got %v", tc.version, tc.expect, res)
}
}
}
func TestSupportsNodesHealthCheck(t *testing.T) {
testCases := []struct {
desc string
nodes []*v1.Node
expect bool
}{
{
"All nodes support nodes health check",
[]*v1.Node{
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.1",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.0-alpha.2.597+276d289b90d322",
},
},
},
},
true,
},
{
"All nodes don't support nodes health check",
[]*v1.Node{
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.6.0-beta.3.472+831q821c907t31a",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.5.2",
},
},
},
},
false,
},
{
"One node doesn't support nodes health check",
[]*v1.Node{
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.1",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.0-alpha.2.597+276d289b90d322",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.5.2",
},
},
},
},
false,
},
}
for _, tc := range testCases {
if res := supportsNodesHealthCheck(tc.nodes); res != tc.expect {
t.Errorf("%v: want %v, got %v", tc.desc, tc.expect, res)
}
}
}

View File

@ -21,7 +21,6 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -140,6 +139,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
} }
hostNames := nodeNames(nodes) hostNames := nodeNames(nodes)
supportsNodesHealthCheck := supportsNodesHealthCheck(nodes)
hosts, err := gce.getInstancesByNames(hostNames) hosts, err := gce.getInstancesByNames(hostNames)
if err != nil { if err != nil {
return nil, err return nil, err
@ -289,13 +289,13 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
// without needing to be deleted and recreated. // without needing to be deleted and recreated.
if firewallExists { if firewallExists {
glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName)
if err := gce.updateFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil { if err := gce.updateFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err return nil, err
} }
glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName)
} else { } else {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName)
if err := gce.createFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil { if err := gce.createFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err return nil, err
} }
glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName)
@ -310,34 +310,43 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name) glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
} }
// Ensure health checks are created for this target pool to pass to createTargetPool for health check links clusterID, err := gce.ClusterID.GetID()
// Alternately, if the service has ExternalTrafficPolicy field set from Local to Global, we need to recreate if err != nil {
// the target pool without health checks. This needs to be prior to the forwarding rule deletion below otherwise return nil, fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err)
// it is not possible to delete just the target pool or http health checks later. }
var hcToCreate *compute.HttpHealthCheck // Check which health check needs to create and which health check needs to delete.
hcExisting, err := gce.GetHttpHealthCheck(loadBalancerName) // Health check management is coupled with target pool operation to prevent leaking.
var hcToCreate, hcToDelete *compute.HttpHealthCheck
hcLocalTrafficExisting, err := gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
return nil, fmt.Errorf("Error checking HTTP health check %s: %v", loadBalancerName, err) return nil, fmt.Errorf("error checking HTTP health check %s: %v", loadBalancerName, err)
} }
if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
glog.V(4).Infof("service %v (%v) needs health checks on :%d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path) glog.V(4).Infof("service %v (%v) needs local traffic health checks on: %d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path)
if err != nil { if hcLocalTrafficExisting == nil {
// This logic exists to detect a transition for a pre-existing service and turn on // This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service
// the tpNeedsUpdate flag to delete/recreate fwdrule/tpool adding the health check // turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// to the target pool. // target pool to use local traffic health check.
glog.V(2).Infof("ExternalTrafficPolicy field set to Local on new or pre-existing service") glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName)
if supportsNodesHealthCheck {
hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
tpNeedsUpdate = true tpNeedsUpdate = true
} }
hcToCreate, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort) hcToCreate = makeHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
if err != nil {
return nil, fmt.Errorf("Failed to ensure health check for localized service %v on node port %v: %v", loadBalancerName, healthCheckNodePort, err)
}
} else { } else {
glog.V(4).Infof("service %v does not need health checks", apiService.Name) glog.V(4).Infof("Service %v needs nodes health checks.", apiService.Name)
if err == nil { if hcLocalTrafficExisting != nil {
glog.V(2).Infof("Deleting stale health checks for service %v LB %v", apiService.Name, loadBalancerName) // This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service
// and turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// target pool to use nodes health check.
glog.V(2).Infof("Updating from local traffic health checks to nodes health checks for service %v LB %v", apiService.Name, loadBalancerName)
hcToDelete = hcLocalTrafficExisting
tpNeedsUpdate = true tpNeedsUpdate = true
} }
if supportsNodesHealthCheck {
hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
} }
// Now we get to some slightly more interesting logic. // Now we get to some slightly more interesting logic.
// First, neither target pools nor forwarding rules can be updated in place - // First, neither target pools nor forwarding rules can be updated in place -
@ -357,17 +366,12 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
} }
if tpExists && tpNeedsUpdate { if tpExists && tpNeedsUpdate {
// Generate the list of health checks for this target pool to pass to deleteTargetPool // Pass healthchecks to deleteTargetPool to cleanup health checks after cleaning up the target pool itself.
if path, _ := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" { var hcNames []string
var err error if hcToDelete != nil {
hcExisting, err = gce.GetHttpHealthCheck(loadBalancerName) hcNames = append(hcNames, hcToDelete.Name)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
} }
} if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil {
// Pass healthchecks to deleteTargetPool to cleanup health checks prior to cleaning up the target pool itself.
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcExisting); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err) return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
} }
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
@ -381,11 +385,11 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
createInstances = createInstances[:maxTargetPoolCreateInstances] createInstances = createInstances[:maxTargetPoolCreateInstances]
} }
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool // Pass healthchecks to createTargetPool which needs them as health check links in the target pool
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType, hcToCreate); err != nil { if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddress, gce.region, createInstances, affinityType, hcToCreate); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err) return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
} }
if hcToCreate != nil { if hcToCreate != nil {
glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks for target pool", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks %v for target pool", loadBalancerName, serviceName, hcToCreate.Name)
} }
if len(hosts) <= maxTargetPoolCreateInstances { if len(hosts) <= maxTargetPoolCreateInstances {
glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName) glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
@ -447,18 +451,29 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName, glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName,
gce.region) gce.region)
var hc *compute.HttpHealthCheck var hcNames []string
if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" { if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" {
var err error hcToDelete, err := gce.GetHttpHealthCheck(loadBalancerName)
hc, err = gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) { if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err) glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
return err return err
} }
hcNames = append(hcNames, hcToDelete.Name)
} else {
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err)
}
// EnsureLoadBalancerDeleted() could be triggered by changing service from
// LoadBalancer type to others. In this case we have no idea whether it was
// using local traffic health check or nodes health check. Attempt to delete
// both to prevent leaking.
hcNames = append(hcNames, loadBalancerName)
hcNames = append(hcNames, makeNodesHealthCheckName(clusterID))
} }
errs := utilerrors.AggregateGoroutines( errs := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(loadBalancerName, gce.region) }, func() error { return gce.deleteFirewall(makeFirewallName(loadBalancerName), gce.region) },
// Even though we don't hold on to static IPs for load balancers, it's // Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureLoadBalancer left one around in a failed // possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case. // creation/update attempt, so make sure we clean it up here just in case.
@ -469,7 +484,7 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil { if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return err return err
} }
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil { if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil {
return err return err
} }
return nil return nil
@ -509,15 +524,15 @@ func (gce *GCECloud) deleteForwardingRule(name, region string) error {
} }
// DeleteTargetPool deletes the given target pool. // DeleteTargetPool deletes the given target pool.
func (gce *GCECloud) DeleteTargetPool(name string, hc *compute.HttpHealthCheck) error { func (gce *GCECloud) DeleteTargetPool(name string, hcNames ...string) error {
region, err := GetGCERegion(gce.localZone) region, err := GetGCERegion(gce.localZone)
if err != nil { if err != nil {
return err return err
} }
return gce.deleteTargetPool(name, region, hc) return gce.deleteTargetPool(name, region, hcNames...)
} }
func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error { func (gce *GCECloud) deleteTargetPool(name, region string, hcNames ...string) error {
mc := newTargetPoolMetricContext("delete", region) mc := newTargetPoolMetricContext("delete", region)
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do() op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
@ -535,41 +550,86 @@ func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealt
} }
// Deletion of health checks is allowed only after the TargetPool reference is deleted // Deletion of health checks is allowed only after the TargetPool reference is deleted
if hc != nil { for _, hcName := range hcNames {
glog.Infof("Deleting health check %v", hc.Name) if err = func() error {
if err := gce.DeleteHttpHealthCheck(hc.Name); err != nil { // Check whether it is nodes health check, which has different name from the load-balancer.
glog.Warningf("Failed to delete health check %v: %v", hc, err) isNodesHealthCheck := hcName != name
if isNodesHealthCheck {
// Lock to prevent deleting necessary nodes health check before it gets attached
// to target pool.
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
glog.Infof("Deleting health check %v", hcName)
if err := gce.DeleteHttpHealthCheck(hcName); err != nil {
// Delete nodes health checks will fail if any other target pool is using it.
if isInUsedByError(err) {
glog.V(4).Infof("Health check %v is in used: %v.", hcName, err)
return nil
} else if !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Warningf("Failed to delete health check %v: %v", hcName, err)
return err return err
} }
} else { // StatusNotFound could happen when:
// This is a HC cleanup attempt to prevent stale HCs when errors are encountered // - This is the first attempt but we pass in a healthcheck that is already deleted
// during HC deletion in a prior pass through EnsureLoadBalancer. // to prevent leaking.
// The HC name matches the load balancer name - normally this is expected to fail. // - This is the first attempt but user manually deleted the heathcheck.
if err := gce.DeleteHttpHealthCheck(name); err == nil { // - This is a retry and in previous round we failed to delete the healthcheck firewall
// We only print a warning if this deletion actually succeeded (which // after deleted the healthcheck.
// means there was indeed a stale health check with the LB name. // We continue to delete the healthcheck firewall to prevent leaking.
glog.Warningf("Deleted stale http health check for LB: %s", name) glog.V(4).Infof("Health check %v is already deleted.", hcName)
}
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID: %v", err)
}
// If health check is deleted without error, it means no load-balancer is using it.
// So we should delete the health check firewall as well.
fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck)
glog.Infof("Deleting firewall %v.", fwName)
if err := gce.DeleteFirewall(fwName); err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
glog.V(4).Infof("Firewall %v is already deleted.", fwName)
return nil
}
return err
}
return nil
}(); err != nil {
return err
} }
} }
return nil return nil
} }
func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error { func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
// health check management is coupled with targetPools to prevent leaks. A // health check management is coupled with targetPools to prevent leaks. A
// target pool is the only thing that requires a health check, so we delete // target pool is the only thing that requires a health check, so we delete
// associated checks on teardown, and ensure checks on setup. // associated checks on teardown, and ensure checks on setup.
hcLinks := []string{} hcLinks := []string{}
if hc != nil { if hc != nil {
// Check whether it is nodes health check, which has different name from the load-balancer.
isNodesHealthCheck := hc.Name != name
if isNodesHealthCheck {
// Lock to prevent necessary nodes health check / firewall gets deleted.
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, gce.region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil {
return err
}
var err error var err error
if hc, err = gce.ensureHttpHealthCheck(name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil { if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil {
return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err) return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err)
} }
hcLinks = append(hcLinks, hc.SelfLink) hcLinks = append(hcLinks, hc.SelfLink)
} }
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
glog.Infof("Creating targetpool %v with %d healthchecks", name, len(hcLinks)) glog.Infof("Creating targetpool %v with %d healthchecks", name, len(hcLinks))
pool := &compute.TargetPool{ pool := &compute.TargetPool{
Name: name, Name: name,
@ -651,8 +711,8 @@ func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name) return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
} }
func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) { func makeHttpHealthCheck(name, path string, port int32) *compute.HttpHealthCheck {
newHC := &compute.HttpHealthCheck{ return &compute.HttpHealthCheck{
Name: name, Name: name,
Port: int64(port), Port: int64(port),
RequestPath: path, RequestPath: path,
@ -663,7 +723,10 @@ func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *c
HealthyThreshold: gceHcHealthyThreshold, HealthyThreshold: gceHcHealthyThreshold,
UnhealthyThreshold: gceHcUnhealthyThreshold, UnhealthyThreshold: gceHcUnhealthyThreshold,
} }
}
func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) {
newHC := makeHttpHealthCheck(name, path, port)
hc, err = gce.GetHttpHealthCheck(name) hc, err = gce.GetHttpHealthCheck(name)
if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) { if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path) glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path)
@ -843,7 +906,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
if isHTTPErrorCode(err, http.StatusNotFound) { if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil return false, true, nil
} }
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err) return false, false, fmt.Errorf("error getting load balancer's firewall: %v", err)
} }
if fw.Description != makeFirewallDescription(serviceName, ipAddress) { if fw.Description != makeFirewallDescription(serviceName, ipAddress) {
return true, true, nil return true, true, nil
@ -856,7 +919,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
for ix := range ports { for ix := range ports {
allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port)) allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port))
} }
if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) { if !equalStringSets(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil return true, true, nil
} }
// The service controller already verified that the protocol matches on all ports, no need to check. // The service controller already verified that the protocol matches on all ports, no need to check.
@ -875,18 +938,47 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
return true, false, nil return true, false, nil
} }
func slicesEqual(x, y []string) bool { func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, region string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error {
if len(x) != len(y) { clusterID, err := gce.ClusterID.GetID()
return false if err != nil {
return fmt.Errorf("error getting cluster ID: %v", err)
} }
sort.Strings(x)
sort.Strings(y) // Prepare the firewall params for creating / checking.
for i := range x { desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID)
if x[i] != y[i] { if !isNodesHealthCheck {
return false desc = makeFirewallDescription(serviceName, ipAddress)
} }
sourceRanges := lbSrcRngsFlag.ipn
ports := []v1.ServicePort{{Protocol: "tcp", Port: hcPort}}
fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck)
fw, err := gce.service.Firewalls.Get(gce.projectID, fwName).Do()
if err != nil {
if !isHTTPErrorCode(err, http.StatusNotFound) {
return fmt.Errorf("error getting firewall for health checks: %v", err)
} }
return true glog.Infof("Creating firewall %v for health checks.", fwName)
if err := gce.createFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
return err
}
glog.Infof("Created firewall %v for health checks.", fwName)
return nil
}
// Validate firewall fields.
if fw.Description != desc ||
len(fw.Allowed) != 1 ||
fw.Allowed[0].IPProtocol != string(ports[0].Protocol) ||
!equalStringSets(fw.Allowed[0].Ports, []string{string(ports[0].Port)}) ||
!equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) {
glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName)
if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
glog.Warningf("Failed to reconcile firewall %v parameters.", fwName)
return err
}
glog.V(4).Infof("Corrected firewall %v parameters successful", fwName)
}
return nil
} }
func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []v1.ServicePort) error { func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []v1.ServicePort) error {
@ -942,7 +1034,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges nets
if err != nil { if err != nil {
return mc.Observe(err) return mc.Observe(err)
} }
op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do() op, err := gce.service.Firewalls.Update(gce.projectID, name, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) { if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return mc.Observe(err) return mc.Observe(err)
} }
@ -971,7 +1063,7 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges nets
} }
firewall := &compute.Firewall{ firewall := &compute.Firewall{
Name: makeFirewallName(name), Name: name,
Description: desc, Description: desc,
Network: gce.networkURL, Network: gce.networkURL,
SourceRanges: sourceRanges.StringSlice(), SourceRanges: sourceRanges.StringSlice(),
@ -1141,17 +1233,16 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
func (gce *GCECloud) deleteFirewall(name, region string) error { func (gce *GCECloud) deleteFirewall(name, region string) error {
mc := newFirewallMetricContext("delete", region) mc := newFirewallMetricContext("delete", region)
fwName := makeFirewallName(name) op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) { if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name) glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
} else if err != nil { } else if err != nil {
glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err) glog.Warningf("Failed to delete firewall %s, got error %v", name, err)
return mc.Observe(err) return mc.Observe(err)
} else { } else {
if err := gce.waitForGlobalOp(op, mc); err != nil { if err := gce.waitForGlobalOp(op, mc); err != nil {
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err) glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", name, err)
return err return err
} }
} }

View File

@ -19,10 +19,12 @@ package gce
import ( import (
"errors" "errors"
"fmt" "fmt"
"net/http"
"regexp" "regexp"
"strings" "strings"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"cloud.google.com/go/compute/metadata" "cloud.google.com/go/compute/metadata"
compute "google.golang.org/api/compute/v1" compute "google.golang.org/api/compute/v1"
@ -105,6 +107,14 @@ func isHTTPErrorCode(err error, code int) bool {
return ok && apiErr.Code == code return ok && apiErr.Code == code
} }
func isInUsedByError(err error) bool {
apiErr, ok := err.(*googleapi.Error)
if !ok || apiErr.Code != http.StatusBadRequest {
return false
}
return strings.Contains(apiErr.Message, "being used by")
}
// splitProviderID splits a provider's id into core components. // splitProviderID splits a provider's id into core components.
// A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}' // A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}'
// See cloudprovider.GetInstanceProviderID. // See cloudprovider.GetInstanceProviderID.
@ -115,3 +125,12 @@ func splitProviderID(providerID string) (project, zone, instance string, err err
} }
return matches[1], matches[2], matches[3], nil return matches[1], matches[2], matches[3], nil
} }
func equalStringSets(x, y []string) bool {
if len(x) != len(y) {
return false
}
xString := sets.NewString(x...)
yString := sets.NewString(y...)
return xString.Equal(yString)
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/cloudprovider"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/master/ports" "k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
@ -45,13 +46,18 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
gceCloud = cloudConfig.Provider.(*gcecloud.GCECloud) gceCloud = cloudConfig.Provider.(*gcecloud.GCECloud)
}) })
// This test takes around 4 minutes to run // This test takes around 6 minutes to run
It("[Slow] [Serial] should create valid firewall rules for LoadBalancer type service", func() { It("[Slow] [Serial] should create valid firewall rules for LoadBalancer type service", func() {
ns := f.Namespace.Name ns := f.Namespace.Name
// This source ranges is just used to examine we have exact same things on LB firewall rules // This source ranges is just used to examine we have exact same things on LB firewall rules
firewallTestSourceRanges := []string{"0.0.0.0/1", "128.0.0.0/1"} firewallTestSourceRanges := []string{"0.0.0.0/1", "128.0.0.0/1"}
serviceName := "firewall-test-loadbalancer" serviceName := "firewall-test-loadbalancer"
By("Getting cluster ID")
clusterID, err := framework.GetClusterID(cs)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Got cluster ID: %v", clusterID)
jig := framework.NewServiceTestJig(cs, serviceName) jig := framework.NewServiceTestJig(cs, serviceName)
nodesNames := jig.GetNodesNames(framework.MaxNodesForEndpointsTests) nodesNames := jig.GetNodesNames(framework.MaxNodesForEndpointsTests)
if len(nodesNames) <= 0 { if len(nodesNames) <= 0 {
@ -59,10 +65,8 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
} }
nodesSet := sets.NewString(nodesNames...) nodesSet := sets.NewString(nodesNames...)
// OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE By("Creating a LoadBalancer type service with ExternalTrafficPolicy=Global")
By("Creating a LoadBalancer type service with ExternalTrafficPolicy=Local") svc := jig.CreateLoadBalancerService(ns, serviceName, framework.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) {
svc := jig.CreateOnlyLocalLoadBalancerService(ns, serviceName,
framework.LoadBalancerCreateTimeoutDefault, false, func(svc *v1.Service) {
svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: framework.FirewallTestHttpPort}} svc.Spec.Ports = []v1.ServicePort{{Protocol: "TCP", Port: framework.FirewallTestHttpPort}}
svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges svc.Spec.LoadBalancerSourceRanges = firewallTestSourceRanges
}) })
@ -72,15 +76,41 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
svc.Spec.LoadBalancerSourceRanges = nil svc.Spec.LoadBalancerSourceRanges = nil
}) })
Expect(cs.Core().Services(svc.Namespace).Delete(svc.Name, nil)).NotTo(HaveOccurred()) Expect(cs.Core().Services(svc.Namespace).Delete(svc.Name, nil)).NotTo(HaveOccurred())
By("Waiting for the local traffic health check firewall rule to be deleted")
localHCFwName := framework.MakeHealthCheckFirewallNameForLBService(clusterID, cloudprovider.GetLoadBalancerName(svc), false)
_, err := framework.WaitForFirewallRule(gceCloud, localHCFwName, false, framework.LoadBalancerCleanupTimeout)
Expect(err).NotTo(HaveOccurred())
}() }()
svcExternalIP := svc.Status.LoadBalancer.Ingress[0].IP svcExternalIP := svc.Status.LoadBalancer.Ingress[0].IP
By("Checking if service's firewall rules are correct") By("Checking if service's firewall rule is correct")
nodeTags := framework.GetInstanceTags(cloudConfig, nodesNames[0]) nodeTags := framework.GetInstanceTags(cloudConfig, nodesNames[0])
expFw := framework.ConstructFirewallForLBService(svc, nodeTags.Items) lbFw := framework.ConstructFirewallForLBService(svc, nodeTags.Items)
fw, err := gceCloud.GetFirewall(expFw.Name) fw, err := gceCloud.GetFirewall(lbFw.Name)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(framework.VerifyFirewallRule(fw, expFw, cloudConfig.Network, false)).NotTo(HaveOccurred()) Expect(framework.VerifyFirewallRule(fw, lbFw, cloudConfig.Network, false)).NotTo(HaveOccurred())
By("Checking if service's nodes health check firewall rule is correct")
nodesHCFw := framework.ConstructHealthCheckFirewallForLBService(clusterID, svc, nodeTags.Items, true)
fw, err = gceCloud.GetFirewall(nodesHCFw.Name)
Expect(err).NotTo(HaveOccurred())
Expect(framework.VerifyFirewallRule(fw, nodesHCFw, cloudConfig.Network, false)).NotTo(HaveOccurred())
// OnlyLocal service is needed to examine which exact nodes the requests are being forwarded to by the Load Balancer on GCE
By("Updating LoadBalancer service to ExternalTrafficPolicy=Local")
svc = jig.UpdateServiceOrFail(svc.Namespace, svc.Name, func(svc *v1.Service) {
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal
})
By("Waiting for the nodes health check firewall rule to be deleted")
_, err = framework.WaitForFirewallRule(gceCloud, nodesHCFw.Name, false, framework.LoadBalancerCleanupTimeout)
Expect(err).NotTo(HaveOccurred())
By("Waiting for the correct local traffic health check firewall rule to be created")
localHCFw := framework.ConstructHealthCheckFirewallForLBService(clusterID, svc, nodeTags.Items, false)
fw, err = framework.WaitForFirewallRule(gceCloud, localHCFw.Name, true, framework.LoadBalancerCreateTimeoutDefault)
Expect(err).NotTo(HaveOccurred())
Expect(framework.VerifyFirewallRule(fw, localHCFw, cloudConfig.Network, false)).NotTo(HaveOccurred())
By(fmt.Sprintf("Creating netexec pods on at most %v nodes", framework.MaxNodesForEndpointsTests)) By(fmt.Sprintf("Creating netexec pods on at most %v nodes", framework.MaxNodesForEndpointsTests))
for i, nodeName := range nodesNames { for i, nodeName := range nodesNames {
@ -100,7 +130,7 @@ var _ = framework.KubeDescribe("Firewall rule", func() {
// by removing the tag on one vm and make sure it doesn't get any traffic. This is an imperfect // by removing the tag on one vm and make sure it doesn't get any traffic. This is an imperfect
// simulation, we really want to check that traffic doesn't reach a vm outside the GKE cluster, but // simulation, we really want to check that traffic doesn't reach a vm outside the GKE cluster, but
// that's much harder to do in the current e2e framework. // that's much harder to do in the current e2e framework.
By("Removing tags from one of the nodes") By(fmt.Sprintf("Removing tags from one of the nodes: %v", nodesNames[0]))
nodesSet.Delete(nodesNames[0]) nodesSet.Delete(nodesNames[0])
removedTags := framework.SetInstanceTags(cloudConfig, nodesNames[0], []string{}) removedTags := framework.SetInstanceTags(cloudConfig, nodesNames[0], []string{})
defer func() { defer func() {

View File

@ -45,6 +45,7 @@ go_library(
"//pkg/api/v1/helper:go_default_library", "//pkg/api/v1/helper:go_default_library",
"//pkg/api/v1/node:go_default_library", "//pkg/api/v1/node:go_default_library",
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/api/v1/service:go_default_library",
"//pkg/apis/apps/v1beta1:go_default_library", "//pkg/apis/apps/v1beta1:go_default_library",
"//pkg/apis/authorization/v1beta1:go_default_library", "//pkg/apis/authorization/v1beta1:go_default_library",
"//pkg/apis/batch:go_default_library", "//pkg/apis/batch:go_default_library",

View File

@ -18,12 +18,16 @@ package framework
import ( import (
"fmt" "fmt"
"net/http"
"strconv" "strconv"
"strings" "strings"
"time" "time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
apiservice "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
@ -41,7 +45,7 @@ const (
) )
// MakeFirewallNameForLBService return the expected firewall name for a LB service. // MakeFirewallNameForLBService return the expected firewall name for a LB service.
// This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce.go // This should match the formatting of makeFirewallName() in pkg/cloudprovider/providers/gce/gce_loadbalancer.go
func MakeFirewallNameForLBService(name string) string { func MakeFirewallNameForLBService(name string) string {
return fmt.Sprintf("k8s-fw-%s", name) return fmt.Sprintf("k8s-fw-%s", name)
} }
@ -68,6 +72,32 @@ func ConstructFirewallForLBService(svc *v1.Service, nodesTags []string) *compute
return &fw return &fw
} }
func MakeHealthCheckFirewallNameForLBService(clusterID, name string, isNodesHealthCheck bool) string {
return gcecloud.MakeHealthCheckFirewallName(clusterID, name, isNodesHealthCheck)
}
// ConstructHealthCheckFirewallForLBService returns the expected GCE firewall rule for a loadbalancer type service
func ConstructHealthCheckFirewallForLBService(clusterID string, svc *v1.Service, nodesTags []string, isNodesHealthCheck bool) *compute.Firewall {
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
Failf("can not construct firewall rule for non-loadbalancer type service")
}
fw := compute.Firewall{}
fw.Name = MakeHealthCheckFirewallNameForLBService(clusterID, cloudprovider.GetLoadBalancerName(svc), isNodesHealthCheck)
fw.TargetTags = nodesTags
fw.SourceRanges = gcecloud.LoadBalancerSrcRanges()
healthCheckPort := gcecloud.GetNodesHealthCheckPort()
if !isNodesHealthCheck {
healthCheckPort = apiservice.GetServiceHealthCheckNodePort(svc)
}
fw.Allowed = []*compute.FirewallAllowed{
{
IPProtocol: "tcp",
Ports: []string{fmt.Sprintf("%d", healthCheckPort)},
},
}
return &fw
}
// GetNodeTags gets tags from one of the Kubernetes nodes // GetNodeTags gets tags from one of the Kubernetes nodes
func GetNodeTags(c clientset.Interface, cloudConfig CloudConfig) *compute.Tags { func GetNodeTags(c clientset.Interface, cloudConfig CloudConfig) *compute.Tags {
nodes := GetReadySchedulableNodesOrDie(c) nodes := GetReadySchedulableNodesOrDie(c)
@ -303,6 +333,9 @@ func SameStringArray(result, expected []string, include bool) error {
// VerifyFirewallRule verifies whether the result firewall is consistent with the expected firewall. // VerifyFirewallRule verifies whether the result firewall is consistent with the expected firewall.
// When `portsSubset` is false, match given ports exactly. Otherwise, only check ports are included. // When `portsSubset` is false, match given ports exactly. Otherwise, only check ports are included.
func VerifyFirewallRule(res, exp *compute.Firewall, network string, portsSubset bool) error { func VerifyFirewallRule(res, exp *compute.Firewall, network string, portsSubset bool) error {
if res == nil || exp == nil {
return fmt.Errorf("res and exp must not be nil")
}
if res.Name != exp.Name { if res.Name != exp.Name {
return fmt.Errorf("incorrect name: %v, expected %v", res.Name, exp.Name) return fmt.Errorf("incorrect name: %v, expected %v", res.Name, exp.Name)
} }
@ -325,3 +358,40 @@ func VerifyFirewallRule(res, exp *compute.Firewall, network string, portsSubset
} }
return nil return nil
} }
func WaitForFirewallRule(gceCloud *gcecloud.GCECloud, fwName string, exist bool, timeout time.Duration) (*compute.Firewall, error) {
Logf("Waiting up to %v for firewall %v exist=%v", timeout, fwName, exist)
var fw *compute.Firewall
var err error
condition := func() (bool, error) {
fw, err = gceCloud.GetFirewall(fwName)
if err != nil && exist ||
err == nil && !exist ||
err != nil && !exist && !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) {
return false, nil
}
return true, nil
}
if err := wait.PollImmediate(5*time.Second, timeout, condition); err != nil {
return nil, fmt.Errorf("error waiting for firewall %v exist=%v", fwName, exist)
}
return fw, nil
}
func GetClusterID(c clientset.Interface) (string, error) {
cm, err := c.Core().ConfigMaps(metav1.NamespaceSystem).Get(gcecloud.UIDConfigMapName, metav1.GetOptions{})
if err != nil || cm == nil {
return "", fmt.Errorf("error getting cluster ID: %v", err)
}
clusterID, clusterIDExists := cm.Data[gcecloud.UIDCluster]
providerID, providerIDExists := cm.Data[gcecloud.UIDProvider]
if !clusterIDExists {
return "", fmt.Errorf("cluster ID not set")
}
if providerIDExists {
return providerID, nil
}
return clusterID, nil
}

View File

@ -235,6 +235,25 @@ func (j *ServiceTestJig) CreateOnlyLocalLoadBalancerService(namespace, serviceNa
return svc return svc
} }
// CreateLoadBalancerService creates a loadbalancer service and waits
// for it to acquire an ingress IP.
func (j *ServiceTestJig) CreateLoadBalancerService(namespace, serviceName string, timeout time.Duration, tweak func(svc *v1.Service)) *v1.Service {
By("creating a service " + namespace + "/" + serviceName + " with type=LoadBalancer")
svc := j.CreateTCPServiceOrFail(namespace, func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
if tweak != nil {
tweak(svc)
}
})
By("waiting for loadbalancer for service " + namespace + "/" + serviceName)
svc = j.WaitForLoadBalancerOrFail(namespace, serviceName, timeout)
j.SanityCheckService(svc, v1.ServiceTypeLoadBalancer)
return svc
}
func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) { func GetNodeAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
for j := range node.Status.Addresses { for j := range node.Status.Addresses {
nodeAddress := &node.Status.Addresses[j] nodeAddress := &node.Status.Addresses[j]

View File

@ -5181,12 +5181,16 @@ func CleanupGCEResources(loadBalancerName string) (retErr error) {
if err := DeleteGCEStaticIP(loadBalancerName); err != nil { if err := DeleteGCEStaticIP(loadBalancerName); err != nil {
Logf("%v", err) Logf("%v", err)
} }
var hcNames []string
hc, getErr := gceCloud.GetHttpHealthCheck(loadBalancerName) hc, getErr := gceCloud.GetHttpHealthCheck(loadBalancerName)
if getErr != nil && !IsGoogleAPIHTTPErrorCode(getErr, http.StatusNotFound) { if getErr != nil && !IsGoogleAPIHTTPErrorCode(getErr, http.StatusNotFound) {
retErr = fmt.Errorf("%v\n%v", retErr, getErr) retErr = fmt.Errorf("%v\n%v", retErr, getErr)
return return
} }
if err := gceCloud.DeleteTargetPool(loadBalancerName, hc); err != nil && if hc != nil {
hcNames = append(hcNames, hc.Name)
}
if err := gceCloud.DeleteTargetPool(loadBalancerName, hcNames...); err != nil &&
!IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) { !IsGoogleAPIHTTPErrorCode(err, http.StatusNotFound) {
retErr = fmt.Errorf("%v\n%v", retErr, err) retErr = fmt.Errorf("%v\n%v", retErr, err)
} }