Add Amazon NLB support

This commit is contained in:
Micah Hausler 2017-11-10 11:28:26 -05:00
parent 142579c16d
commit f9445b9dc7
4 changed files with 1256 additions and 0 deletions

View File

@ -38,6 +38,7 @@ import (
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/aws/aws-sdk-go/service/kms"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
@ -61,6 +62,18 @@ import (
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
// NLBHealthCheckRuleDescription is the comment used on a security group rule to
// indicate that it is used for health checks
const NLBHealthCheckRuleDescription = "kubernetes.io/rule/nlb/health"
// NLBClientRuleDescription is the comment used on a security group rule to
// indicate that it is used for client traffic
const NLBClientRuleDescription = "kubernetes.io/rule/nlb/client"
// NLBMtuDiscoveryRuleDescription is the comment used on a security group rule
// to indicate that it is used for mtu discovery
const NLBMtuDiscoveryRuleDescription = "kubernetes.io/rule/nlb/mtu"
// ProviderName is the name of this cloud provider.
const ProviderName = "aws"
@ -76,6 +89,11 @@ const TagNameSubnetInternalELB = "kubernetes.io/role/internal-elb"
// it should be used for internet ELBs
const TagNameSubnetPublicELB = "kubernetes.io/role/elb"
// ServiceAnnotationLoadBalancerType is the annotation used on the service
// to indicate what type of Load Balancer we want. Right now, the only accepted
// value is "nlb"
const ServiceAnnotationLoadBalancerType = "service.beta.kubernetes.io/aws-load-balancer-type"
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
// to indicate that we want an internal ELB.
const ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/aws-load-balancer-internal"
@ -224,6 +242,7 @@ var _ cloudprovider.PVLabeler = (*Cloud)(nil)
type Services interface {
Compute(region string) (EC2, error)
LoadBalancing(region string) (ELB, error)
LoadBalancingV2(region string) (ELBV2, error)
Autoscaling(region string) (ASG, error)
Metadata() (EC2Metadata, error)
KeyManagement(region string) (KMS, error)
@ -264,6 +283,8 @@ type EC2 interface {
DeleteRoute(request *ec2.DeleteRouteInput) (*ec2.DeleteRouteOutput, error)
ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeInput) (*ec2.ModifyInstanceAttributeOutput, error)
DescribeVpcs(input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error)
}
// ELB is a simple pass-through of AWS' ELB client interface, which allows for testing
@ -293,6 +314,38 @@ type ELB interface {
ModifyLoadBalancerAttributes(*elb.ModifyLoadBalancerAttributesInput) (*elb.ModifyLoadBalancerAttributesOutput, error)
}
// ELBV2 is a simple pass-through of AWS' ELBV2 client interface, which allows for testing
type ELBV2 interface {
AddTags(input *elbv2.AddTagsInput) (*elbv2.AddTagsOutput, error)
CreateLoadBalancer(*elbv2.CreateLoadBalancerInput) (*elbv2.CreateLoadBalancerOutput, error)
DescribeLoadBalancers(*elbv2.DescribeLoadBalancersInput) (*elbv2.DescribeLoadBalancersOutput, error)
DeleteLoadBalancer(*elbv2.DeleteLoadBalancerInput) (*elbv2.DeleteLoadBalancerOutput, error)
ModifyLoadBalancerAttributes(*elbv2.ModifyLoadBalancerAttributesInput) (*elbv2.ModifyLoadBalancerAttributesOutput, error)
DescribeLoadBalancerAttributes(*elbv2.DescribeLoadBalancerAttributesInput) (*elbv2.DescribeLoadBalancerAttributesOutput, error)
CreateTargetGroup(*elbv2.CreateTargetGroupInput) (*elbv2.CreateTargetGroupOutput, error)
DescribeTargetGroups(*elbv2.DescribeTargetGroupsInput) (*elbv2.DescribeTargetGroupsOutput, error)
ModifyTargetGroup(*elbv2.ModifyTargetGroupInput) (*elbv2.ModifyTargetGroupOutput, error)
DeleteTargetGroup(*elbv2.DeleteTargetGroupInput) (*elbv2.DeleteTargetGroupOutput, error)
DescribeTargetHealth(input *elbv2.DescribeTargetHealthInput) (*elbv2.DescribeTargetHealthOutput, error)
DescribeTargetGroupAttributes(*elbv2.DescribeTargetGroupAttributesInput) (*elbv2.DescribeTargetGroupAttributesOutput, error)
ModifyTargetGroupAttributes(*elbv2.ModifyTargetGroupAttributesInput) (*elbv2.ModifyTargetGroupAttributesOutput, error)
RegisterTargets(*elbv2.RegisterTargetsInput) (*elbv2.RegisterTargetsOutput, error)
DeregisterTargets(*elbv2.DeregisterTargetsInput) (*elbv2.DeregisterTargetsOutput, error)
CreateListener(*elbv2.CreateListenerInput) (*elbv2.CreateListenerOutput, error)
DescribeListeners(*elbv2.DescribeListenersInput) (*elbv2.DescribeListenersOutput, error)
DeleteListener(*elbv2.DeleteListenerInput) (*elbv2.DeleteListenerOutput, error)
ModifyListener(*elbv2.ModifyListenerInput) (*elbv2.ModifyListenerOutput, error)
WaitUntilLoadBalancersDeleted(*elbv2.DescribeLoadBalancersInput) error
}
// ASG is a simple pass-through of the Autoscaling client interface, which
// allows for testing.
type ASG interface {
@ -402,6 +455,7 @@ type InstanceGroupInfo interface {
type Cloud struct {
ec2 EC2
elb ELB
elbv2 ELBV2
asg ASG
kms KMS
metadata EC2Metadata
@ -587,6 +641,20 @@ func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) {
return elbClient, nil
}
func (p *awsSDKProvider) LoadBalancingV2(regionName string) (ELBV2, error) {
awsConfig := &aws.Config{
Region: &regionName,
Credentials: p.creds,
}
awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true)
elbClient := elbv2.New(session.New(awsConfig))
p.addHandlers(regionName, &elbClient.Handlers)
return elbClient, nil
}
func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) {
awsConfig := &aws.Config{
Region: &regionName,
@ -800,6 +868,10 @@ func (s *awsSdkEC2) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttribute
return s.ec2.ModifyInstanceAttribute(request)
}
func (s *awsSdkEC2) DescribeVpcs(request *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) {
return s.ec2.DescribeVpcs(request)
}
func init() {
registerMetrics()
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) {
@ -913,6 +985,11 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
return nil, fmt.Errorf("error creating AWS ELB client: %v", err)
}
elbv2, err := awsServices.LoadBalancingV2(regionName)
if err != nil {
return nil, fmt.Errorf("error creating AWS ELBV2 client: %v", err)
}
asg, err := awsServices.Autoscaling(regionName)
if err != nil {
return nil, fmt.Errorf("error creating AWS autoscaling client: %v", err)
@ -926,6 +1003,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) {
awsCloud := &Cloud{
ec2: ec2,
elb: elb,
elbv2: elbv2,
asg: asg,
metadata: metadata,
kms: kms,
@ -2270,6 +2348,32 @@ func (c *Cloud) addLoadBalancerTags(loadBalancerName string, requested map[strin
return nil
}
// Gets the current load balancer state
func (c *Cloud) describeLoadBalancerv2(name string) (*elbv2.LoadBalancer, error) {
request := &elbv2.DescribeLoadBalancersInput{
Names: []*string{aws.String(name)},
}
response, err := c.elbv2.DescribeLoadBalancers(request)
if err != nil {
if awsError, ok := err.(awserr.Error); ok {
if awsError.Code() == elbv2.ErrCodeLoadBalancerNotFoundException {
return nil, nil
}
}
return nil, fmt.Errorf("Error describing load balancer: %q", err)
}
// AWS will not return 2 load balancers with the same name _and_ type.
for i := range response.LoadBalancers {
if aws.StringValue(response.LoadBalancers[i].Type) == elbv2.LoadBalancerTypeEnumNetwork {
return response.LoadBalancers[i], nil
}
}
return nil, fmt.Errorf("NLB '%s' could not be found", name)
}
// Retrieves instance's vpc id from metadata
func (c *Cloud) findVPCID() (string, error) {
macs, err := c.metadata.GetMetadata("network/interfaces/macs/")
@ -2959,6 +3063,8 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
// Figure out what mappings we want on the load balancer
listeners := []*elb.Listener{}
v2Mappings := []nlbPortMapping{}
portList := getPortSets(annotations[ServiceAnnotationLoadBalancerSSLPorts])
for _, port := range apiService.Spec.Ports {
if port.Protocol != v1.ProtocolTCP {
@ -2968,6 +3074,17 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
glog.Errorf("Ignoring port without NodePort defined: %v", port)
continue
}
if isNLB(annotations) {
v2Mappings = append(v2Mappings, nlbPortMapping{
FrontendPort: int64(port.Port),
TrafficPort: int64(port.NodePort),
// if externalTrafficPolicy == "Local", we'll override the
// health check later
HealthCheckPort: int64(port.NodePort),
HealthCheckProtocol: elbv2.ProtocolEnumTcp,
})
}
listener, err := buildListener(port, annotations, portList)
if err != nil {
return nil, err
@ -2996,6 +3113,69 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
internalELB = true
}
if isNLB(annotations) {
if path, healthCheckNodePort := service.GetServiceHealthCheckPathPort(apiService); path != "" {
for i := range v2Mappings {
v2Mappings[i].HealthCheckPort = int64(healthCheckNodePort)
v2Mappings[i].HealthCheckPath = path
v2Mappings[i].HealthCheckProtocol = elbv2.ProtocolEnumHttp
}
}
// Find the subnets that the ELB will live in
subnetIDs, err := c.findELBSubnets(internalELB)
if err != nil {
glog.Errorf("Error listing subnets in VPC: %q", err)
return nil, err
}
// Bail out early if there are no subnets
if len(subnetIDs) == 0 {
return nil, fmt.Errorf("could not find any suitable subnets for creating the ELB")
}
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
instanceIDs := []string{}
for id := range instances {
instanceIDs = append(instanceIDs, string(id))
}
v2LoadBalancer, err := c.ensureLoadBalancerv2(
serviceName,
loadBalancerName,
v2Mappings,
instanceIDs,
subnetIDs,
internalELB,
annotations,
)
if err != nil {
return nil, err
}
sourceRangeCidrs := []string{}
for cidr := range sourceRanges {
sourceRangeCidrs = append(sourceRangeCidrs, cidr)
}
if len(sourceRangeCidrs) == 0 {
sourceRangeCidrs = append(sourceRangeCidrs, "0.0.0.0/0")
}
err = c.updateInstanceSecurityGroupsForNLB(v2Mappings, instances, loadBalancerName, sourceRangeCidrs)
if err != nil {
glog.Warningf("Error opening ingress rules for the load balancer to the instances: %q", err)
return nil, err
}
// We don't have an `ensureLoadBalancerInstances()` function for elbv2
// because `ensureLoadBalancerv2()` requires instance Ids
// TODO: Wait for creation?
return v2toStatus(v2LoadBalancer), nil
}
// Determine if we need to set the Proxy protocol policy
proxyProtocol := false
proxyProtocolAnnotation := apiService.Annotations[ServiceAnnotationLoadBalancerProxyProtocol]
@ -3240,6 +3420,18 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (c *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
if isNLB(service.Annotations) {
lb, err := c.describeLoadBalancerv2(loadBalancerName)
if err != nil {
return nil, false, err
}
if lb == nil {
return nil, false, nil
}
return v2toStatus(lb), true, nil
}
lb, err := c.describeLoadBalancer(loadBalancerName)
if err != nil {
return nil, false, err
@ -3265,6 +3457,24 @@ func toStatus(lb *elb.LoadBalancerDescription) *v1.LoadBalancerStatus {
return status
}
func v2toStatus(lb *elbv2.LoadBalancer) *v1.LoadBalancerStatus {
status := &v1.LoadBalancerStatus{}
if lb == nil {
glog.Error("[BUG] v2toStatus got nil input, this is a Kubernetes bug, please report")
return status
}
// We check for Active or Provisioning, the only successful statuses
if aws.StringValue(lb.DNSName) != "" && (aws.StringValue(lb.State.Code) == elbv2.LoadBalancerStateEnumActive ||
aws.StringValue(lb.State.Code) == elbv2.LoadBalancerStateEnumProvisioning) {
var ingress v1.LoadBalancerIngress
ingress.Hostname = aws.StringValue(lb.DNSName)
status.Ingress = []v1.LoadBalancerIngress{ingress}
}
return status
}
// Returns the first security group for an instance, or nil
// We only create instances with one security group, so we don't expect multiple security groups.
// However, if there are multiple security groups, we will choose the one tagged with our cluster filter.
@ -3470,6 +3680,139 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
// EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted.
func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
if isNLB(service.Annotations) {
lb, err := c.describeLoadBalancerv2(loadBalancerName)
if err != nil {
return err
}
if lb == nil {
glog.Info("Load balancer already deleted: ", loadBalancerName)
return nil
}
// Delete the LoadBalancer and target groups
//
// Deleting a target group while associated with a load balancer will
// fail. We delete the loadbalancer first. This does leave the
// possibility of zombie target groups if DeleteLoadBalancer() fails
//
// * Get target groups for NLB
// * Delete Load Balancer
// * Delete target groups
// * Clean up SecurityGroupRules
{
targetGroups, err := c.elbv2.DescribeTargetGroups(
&elbv2.DescribeTargetGroupsInput{LoadBalancerArn: lb.LoadBalancerArn},
)
if err != nil {
return fmt.Errorf("Error listing target groups before deleting load balancer: %q", err)
}
_, err = c.elbv2.DeleteLoadBalancer(
&elbv2.DeleteLoadBalancerInput{LoadBalancerArn: lb.LoadBalancerArn},
)
if err != nil {
return fmt.Errorf("Error deleting load balancer %q: %v", loadBalancerName, err)
}
for _, group := range targetGroups.TargetGroups {
_, err := c.elbv2.DeleteTargetGroup(
&elbv2.DeleteTargetGroupInput{TargetGroupArn: group.TargetGroupArn},
)
if err != nil {
return fmt.Errorf("Error deleting target groups after deleting load balancer: %q", err)
}
}
}
{
var matchingGroups []*ec2.SecurityGroup
{
// Server side filter
describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{
newEc2Filter("ip-permission.protocol", "tcp"),
}
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil {
return fmt.Errorf("Error querying security groups for NLB: %q", err)
}
for _, sg := range response {
if !c.tagging.hasClusterTag(sg.Tags) {
continue
}
matchingGroups = append(matchingGroups, sg)
}
// client-side filter out groups that don't have IP Rules we've
// annotated for this service
matchingGroups = filterForIPRangeDescription(matchingGroups, loadBalancerName)
}
{
clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, loadBalancerName)
mtuRule := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, loadBalancerName)
healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, loadBalancerName)
for i := range matchingGroups {
removes := []*ec2.IpPermission{}
for j := range matchingGroups[i].IpPermissions {
v4rangesToRemove := []*ec2.IpRange{}
v6rangesToRemove := []*ec2.Ipv6Range{}
// Find IpPermission that contains k8s description
// If we removed the whole IpPermission, it could contain other non-k8s specified ranges
for k := range matchingGroups[i].IpPermissions[j].IpRanges {
description := aws.StringValue(matchingGroups[i].IpPermissions[j].IpRanges[k].Description)
if description == clientRule || description == mtuRule || description == healthRule {
v4rangesToRemove = append(v4rangesToRemove, matchingGroups[i].IpPermissions[j].IpRanges[k])
}
}
// Find IpPermission that contains k8s description
// If we removed the whole IpPermission, it could contain other non-k8s specified rangesk
for k := range matchingGroups[i].IpPermissions[j].Ipv6Ranges {
description := aws.StringValue(matchingGroups[i].IpPermissions[j].Ipv6Ranges[k].Description)
if description == clientRule || description == mtuRule || description == healthRule {
v6rangesToRemove = append(v6rangesToRemove, matchingGroups[i].IpPermissions[j].Ipv6Ranges[k])
}
}
if len(v4rangesToRemove) > 0 || len(v6rangesToRemove) > 0 {
// create a new *IpPermission to not accidentally remove UserIdGroupPairs
removedPermission := &ec2.IpPermission{
FromPort: matchingGroups[i].IpPermissions[j].FromPort,
IpProtocol: matchingGroups[i].IpPermissions[j].IpProtocol,
IpRanges: v4rangesToRemove,
Ipv6Ranges: v6rangesToRemove,
ToPort: matchingGroups[i].IpPermissions[j].ToPort,
}
removes = append(removes, removedPermission)
}
}
if len(removes) > 0 {
changed, err := c.removeSecurityGroupIngress(aws.StringValue(matchingGroups[i].GroupId), removes)
if err != nil {
return err
}
if !changed {
glog.Warning("Revoking ingress was not needed; concurrent change? groupId=", *matchingGroups[i].GroupId)
}
}
}
}
}
return nil
}
lb, err := c.describeLoadBalancer(loadBalancerName)
if err != nil {
return err
@ -3575,6 +3918,17 @@ func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, node
}
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
if isNLB(service.Annotations) {
lb, err := c.describeLoadBalancerv2(loadBalancerName)
if err != nil {
return err
}
if lb == nil {
return fmt.Errorf("Load balancer not found")
}
_, err = c.EnsureLoadBalancer(clusterName, service, nodes)
return err
}
lb, err := c.describeLoadBalancer(loadBalancerName)
if err != nil {
return err

View File

@ -23,6 +23,7 @@ import (
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/aws/aws-sdk-go/service/kms"
"github.com/golang/glog"
@ -38,6 +39,7 @@ type FakeAWSServices struct {
ec2 FakeEC2
elb ELB
elbv2 ELBV2
asg *FakeASG
metadata *FakeMetadata
kms *FakeKMS
@ -48,6 +50,7 @@ func NewFakeAWSServices(clusterId string) *FakeAWSServices {
s.region = "us-east-1"
s.ec2 = &FakeEC2Impl{aws: s}
s.elb = &FakeELB{aws: s}
s.elbv2 = &FakeELBV2{aws: s}
s.asg = &FakeASG{aws: s}
s.metadata = &FakeMetadata{aws: s}
s.kms = &FakeKMS{aws: s}
@ -90,6 +93,10 @@ func (s *FakeAWSServices) LoadBalancing(region string) (ELB, error) {
return s.elb, nil
}
func (s *FakeAWSServices) LoadBalancingV2(region string) (ELBV2, error) {
return s.elbv2, nil
}
func (s *FakeAWSServices) Autoscaling(region string) (ASG, error) {
return s.asg, nil
}
@ -246,6 +253,10 @@ func (ec2i *FakeEC2Impl) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttr
panic("Not implemented")
}
func (ec2i *FakeEC2Impl) DescribeVpcs(request *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) {
return &ec2.DescribeVpcsOutput{Vpcs: []*ec2.Vpc{{CidrBlock: aws.String("172.20.0.0/16")}}}, nil
}
type FakeMetadata struct {
aws *FakeAWSServices
}
@ -376,6 +387,79 @@ func (self *FakeELB) expectDescribeLoadBalancers(loadBalancerName string) {
panic("Not implemented")
}
type FakeELBV2 struct {
aws *FakeAWSServices
}
func (self *FakeELBV2) AddTags(input *elbv2.AddTagsInput) (*elbv2.AddTagsOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) CreateLoadBalancer(*elbv2.CreateLoadBalancerInput) (*elbv2.CreateLoadBalancerOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DescribeLoadBalancers(*elbv2.DescribeLoadBalancersInput) (*elbv2.DescribeLoadBalancersOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DeleteLoadBalancer(*elbv2.DeleteLoadBalancerInput) (*elbv2.DeleteLoadBalancerOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) ModifyLoadBalancerAttributes(*elbv2.ModifyLoadBalancerAttributesInput) (*elbv2.ModifyLoadBalancerAttributesOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DescribeLoadBalancerAttributes(*elbv2.DescribeLoadBalancerAttributesInput) (*elbv2.DescribeLoadBalancerAttributesOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) CreateTargetGroup(*elbv2.CreateTargetGroupInput) (*elbv2.CreateTargetGroupOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DescribeTargetGroups(*elbv2.DescribeTargetGroupsInput) (*elbv2.DescribeTargetGroupsOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) ModifyTargetGroup(*elbv2.ModifyTargetGroupInput) (*elbv2.ModifyTargetGroupOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DeleteTargetGroup(*elbv2.DeleteTargetGroupInput) (*elbv2.DeleteTargetGroupOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DescribeTargetHealth(input *elbv2.DescribeTargetHealthInput) (*elbv2.DescribeTargetHealthOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DescribeTargetGroupAttributes(*elbv2.DescribeTargetGroupAttributesInput) (*elbv2.DescribeTargetGroupAttributesOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) ModifyTargetGroupAttributes(*elbv2.ModifyTargetGroupAttributesInput) (*elbv2.ModifyTargetGroupAttributesOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) RegisterTargets(*elbv2.RegisterTargetsInput) (*elbv2.RegisterTargetsOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DeregisterTargets(*elbv2.DeregisterTargetsInput) (*elbv2.DeregisterTargetsOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) CreateListener(*elbv2.CreateListenerInput) (*elbv2.CreateListenerOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DescribeListeners(*elbv2.DescribeListenersInput) (*elbv2.DescribeListenersOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) DeleteListener(*elbv2.DeleteListenerInput) (*elbv2.DeleteListenerOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) ModifyListener(*elbv2.ModifyListenerInput) (*elbv2.ModifyListenerOutput, error) {
panic("Not implemented")
}
func (self *FakeELBV2) WaitUntilLoadBalancersDeleted(*elbv2.DescribeLoadBalancersInput) error {
panic("Not implemented")
}
type FakeASG struct {
aws *FakeAWSServices
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package aws
import (
"crypto/sha1"
"fmt"
"reflect"
"strconv"
@ -26,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/golang/glog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
@ -36,6 +38,23 @@ const ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled"
const SSLNegotiationPolicyNameFormat = "k8s-SSLNegotiationPolicy-%s"
func isNLB(annotations map[string]string) bool {
if annotations[ServiceAnnotationLoadBalancerType] == "nlb" {
return true
}
return false
}
type nlbPortMapping struct {
FrontendPort int64
TrafficPort int64
ClientCIDR string
HealthCheckPort int64
HealthCheckPath string
HealthCheckProtocol string
}
// getLoadBalancerAdditionalTags converts the comma separated list of key-value
// pairs in the ServiceAnnotationLoadBalancerAdditionalTags annotation and returns
// it as a map.
@ -65,6 +84,760 @@ func getLoadBalancerAdditionalTags(annotations map[string]string) map[string]str
return additionalTags
}
// ensureLoadBalancerv2 ensures a v2 load balancer is created
func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBalancerName string, mappings []nlbPortMapping, instanceIDs, subnetIDs []string, internalELB bool, annotations map[string]string) (*elbv2.LoadBalancer, error) {
loadBalancer, err := c.describeLoadBalancerv2(loadBalancerName)
if err != nil {
return nil, err
}
dirty := false
// Get additional tags set by the user
tags := getLoadBalancerAdditionalTags(annotations)
// Add default tags
tags[TagNameKubernetesService] = namespacedName.String()
tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)
if loadBalancer == nil {
// Create the LB
createRequest := &elbv2.CreateLoadBalancerInput{
Type: aws.String(elbv2.LoadBalancerTypeEnumNetwork),
Name: aws.String(loadBalancerName),
}
if internalELB {
createRequest.Scheme = aws.String("internal")
}
// We are supposed to specify one subnet per AZ.
// TODO: What happens if we have more than one subnet per AZ?
createRequest.SubnetMappings = createSubnetMappings(subnetIDs)
for k, v := range tags {
createRequest.Tags = append(createRequest.Tags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
glog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
createResponse, err := c.elbv2.CreateLoadBalancer(createRequest)
if err != nil {
return nil, fmt.Errorf("Error creating load balancer: %q", err)
}
loadBalancer = createResponse.LoadBalancers[0]
// Create Target Groups
addTagsInput := &elbv2.AddTagsInput{
ResourceArns: []*string{},
Tags: []*elbv2.Tag{},
}
for i := range mappings {
// It is easier to keep track of updates by having possibly
// duplicate target groups where the backend port is the same
_, targetGroupArn, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId)
if err != nil {
return nil, fmt.Errorf("Error creating listener: %q", err)
}
addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn)
}
// Add tags to targets
for k, v := range tags {
addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 {
_, err = c.elbv2.AddTags(addTagsInput)
if err != nil {
return nil, fmt.Errorf("Error adding tags after creating Load Balancer: %q", err)
}
}
} else {
// TODO: Sync internal vs non-internal
// sync mappings
{
listenerDescriptions, err := c.elbv2.DescribeListeners(
&elbv2.DescribeListenersInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
},
)
if err != nil {
return nil, fmt.Errorf("Error describing listeners: %q", err)
}
// actual maps FrontendPort to an elbv2.Listener
actual := map[int64]*elbv2.Listener{}
for _, listener := range listenerDescriptions.Listeners {
actual[*listener.Port] = listener
}
actualTargetGroups, err := c.elbv2.DescribeTargetGroups(
&elbv2.DescribeTargetGroupsInput{
LoadBalancerArn: loadBalancer.LoadBalancerArn,
},
)
if err != nil {
return nil, fmt.Errorf("Error listing target groups: %q", err)
}
nodePortTargetGroup := map[int64]*elbv2.TargetGroup{}
for _, targetGroup := range actualTargetGroups.TargetGroups {
nodePortTargetGroup[*targetGroup.Port] = targetGroup
}
// Create Target Groups
addTagsInput := &elbv2.AddTagsInput{
ResourceArns: []*string{},
Tags: []*elbv2.Tag{},
}
// Handle additions/modifications
for _, mapping := range mappings {
frontendPort := mapping.FrontendPort
nodePort := mapping.TrafficPort
// modifications
if listener, ok := actual[frontendPort]; ok {
// nodePort must have changed, we'll need to delete old TG
// and recreate
if targetGroup, ok := nodePortTargetGroup[nodePort]; !ok {
// Create new Target group
targetName := createTargetName(namespacedName, frontendPort, nodePort)
targetGroup, err = c.ensureTargetGroup(
nil,
mapping,
instanceIDs,
targetName,
*loadBalancer.VpcId,
)
if err != nil {
return nil, err
}
// Associate new target group to LB
_, err := c.elbv2.ModifyListener(&elbv2.ModifyListenerInput{
ListenerArn: listener.ListenerArn,
Port: aws.Int64(frontendPort),
Protocol: aws.String("TCP"),
DefaultActions: []*elbv2.Action{{
TargetGroupArn: targetGroup.TargetGroupArn,
Type: aws.String("forward"),
}},
})
if err != nil {
return nil, fmt.Errorf("Error updating load balancer listener: %q", err)
}
// Delete old target group
_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{
TargetGroupArn: listener.DefaultActions[0].TargetGroupArn,
})
if err != nil {
return nil, fmt.Errorf("Error deleting old target group: %q", err)
}
} else {
// Run ensureTargetGroup to make sure instances in service are up-to-date
targetName := createTargetName(namespacedName, frontendPort, nodePort)
_, err = c.ensureTargetGroup(
targetGroup,
mapping,
instanceIDs,
targetName,
*loadBalancer.VpcId,
)
if err != nil {
return nil, err
}
}
dirty = true
continue
}
// Additions
_, targetGroupArn, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId)
if err != nil {
return nil, err
}
addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn)
dirty = true
}
frontEndPorts := map[int64]bool{}
for i := range mappings {
frontEndPorts[mappings[i].FrontendPort] = true
}
// handle deletions
for port, listener := range actual {
if _, ok := frontEndPorts[port]; !ok {
err := c.deleteListenerV2(listener)
if err != nil {
return nil, err
}
dirty = true
}
}
// Add tags to new targets
for k, v := range tags {
addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 {
_, err = c.elbv2.AddTags(addTagsInput)
if err != nil {
return nil, fmt.Errorf("Error adding tags after modifying load balancer targets: %q", err)
}
}
}
// Subnets cannot be modified on NLBs
if dirty {
loadBalancers, err := c.elbv2.DescribeLoadBalancers(
&elbv2.DescribeLoadBalancersInput{
LoadBalancerArns: []*string{
loadBalancer.LoadBalancerArn,
},
},
)
if err != nil {
return nil, fmt.Errorf("Error retrieving load balancer after update: %q", err)
}
loadBalancer = loadBalancers.LoadBalancers[0]
}
}
return loadBalancer, nil
}
// create a valid target group name - ensure name is not over 32 characters
func createTargetName(namespacedName types.NamespacedName, frontendPort, nodePort int64) string {
sha := fmt.Sprintf("%x", sha1.Sum([]byte(namespacedName.String())))[:13]
return fmt.Sprintf("k8s-tg-%s-%d-%d", sha, frontendPort, nodePort)
}
func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string) (listener *elbv2.Listener, targetGroupArn *string, err error) {
targetName := createTargetName(namespacedName, mapping.FrontendPort, mapping.TrafficPort)
glog.Infof("Creating load balancer target group for %v with name: %s", namespacedName, targetName)
target, err := c.ensureTargetGroup(
nil,
mapping,
instanceIDs,
targetName,
vpcID,
)
if err != nil {
return nil, aws.String(""), err
}
createListernerInput := &elbv2.CreateListenerInput{
LoadBalancerArn: loadBalancerArn,
Port: aws.Int64(mapping.FrontendPort),
Protocol: aws.String("TCP"),
DefaultActions: []*elbv2.Action{{
TargetGroupArn: target.TargetGroupArn,
Type: aws.String(elbv2.ActionTypeEnumForward),
}},
}
glog.Infof("Creating load balancer listener for %v", namespacedName)
createListenerOutput, err := c.elbv2.CreateListener(createListernerInput)
if err != nil {
return nil, aws.String(""), fmt.Errorf("Error creating load balancer listener: %q", err)
}
return createListenerOutput.Listeners[0], target.TargetGroupArn, nil
}
// cleans up listener and corresponding target group
func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
_, err := c.elbv2.DeleteListener(&elbv2.DeleteListenerInput{ListenerArn: listener.ListenerArn})
if err != nil {
return fmt.Errorf("Error deleting load balancer listener: %q", err)
}
_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{TargetGroupArn: listener.DefaultActions[0].TargetGroupArn})
if err != nil {
return fmt.Errorf("Error deleting load balancer target group: %q", err)
}
return nil
}
// ensureTargetGroup creates a target group with a set of instances
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPortMapping, instances []string, name string, vpcID string) (*elbv2.TargetGroup, error) {
dirty := false
if targetGroup == nil {
input := &elbv2.CreateTargetGroupInput{
VpcId: aws.String(vpcID),
Name: aws.String(name),
Port: aws.Int64(mapping.TrafficPort),
Protocol: aws.String("TCP"),
TargetType: aws.String("instance"),
HealthCheckIntervalSeconds: aws.Int64(30),
HealthCheckPort: aws.String("traffic-port"),
HealthCheckProtocol: aws.String("TCP"),
HealthyThresholdCount: aws.Int64(3),
UnhealthyThresholdCount: aws.Int64(3),
}
input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol)
if mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp {
input.HealthCheckPath = aws.String(mapping.HealthCheckPath)
}
// Account for externalTrafficPolicy = "Local"
if mapping.HealthCheckPort != mapping.TrafficPort {
input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort)))
}
result, err := c.elbv2.CreateTargetGroup(input)
if err != nil {
return nil, fmt.Errorf("Error creating load balancer target group: %q", err)
}
if len(result.TargetGroups) != 1 {
return nil, fmt.Errorf("Expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups))
}
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: result.TargetGroups[0].TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for _, instanceID := range instances {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(string(instanceID)),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err = c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("Error registering targets for load balancer: %q", err)
}
return result.TargetGroups[0], nil
}
// handle instances in service
{
healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn})
if err != nil {
return nil, fmt.Errorf("Error describing target group health: %q", err)
}
actualIDs := []string{}
for _, healthDescription := range healthResponse.TargetHealthDescriptions {
if healthDescription.TargetHealth.Reason != nil {
switch aws.StringValue(healthDescription.TargetHealth.Reason) {
case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress:
// We don't need to count this instance in service if it is
// on its way out
default:
actualIDs = append(actualIDs, *healthDescription.Target.Id)
}
}
}
actual := sets.NewString(actualIDs...)
expected := sets.NewString(instances...)
additions := expected.Difference(actual)
removals := actual.Difference(expected)
if len(additions) > 0 {
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range additions {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("Error registering new targets in target group: %q", err)
}
dirty = true
}
if len(removals) > 0 {
deregisterInput := &elbv2.DeregisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range removals {
deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.DeregisterTargets(deregisterInput)
if err != nil {
return nil, fmt.Errorf("Error trying to deregister targets in target group: %q", err)
}
dirty = true
}
}
// ensure the health check is correct
{
dirtyHealthCheck := false
input := &elbv2.ModifyTargetGroupInput{
TargetGroupArn: targetGroup.TargetGroupArn,
}
if aws.StringValue(targetGroup.HealthCheckProtocol) != mapping.HealthCheckProtocol {
input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol)
dirtyHealthCheck = true
}
if aws.StringValue(targetGroup.HealthCheckPort) != strconv.Itoa(int(mapping.HealthCheckPort)) {
input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort)))
dirtyHealthCheck = true
}
if mapping.HealthCheckPath != "" && mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp {
input.HealthCheckPath = aws.String(mapping.HealthCheckPath)
dirtyHealthCheck = true
}
if dirtyHealthCheck {
_, err := c.elbv2.ModifyTargetGroup(input)
if err != nil {
return nil, fmt.Errorf("Error modifying target group health check: %q", err)
}
dirty = true
}
}
if dirty {
result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{
Names: []*string{aws.String(name)},
})
if err != nil {
return nil, fmt.Errorf("Error retrieving target group after creation/update: %q", err)
}
targetGroup = result.TargetGroups[0]
}
return targetGroup, nil
}
func portsForNLB(lbName string, sg *ec2.SecurityGroup, clientTraffic bool) sets.Int64 {
response := sets.NewInt64()
var annotation string
if clientTraffic {
annotation = fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
} else {
annotation = fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
}
for i := range sg.IpPermissions {
for j := range sg.IpPermissions[i].IpRanges {
description := aws.StringValue(sg.IpPermissions[i].IpRanges[j].Description)
if description == annotation {
// TODO should probably check FromPort == ToPort
response.Insert(aws.Int64Value(sg.IpPermissions[i].FromPort))
}
}
}
return response
}
// filterForIPRangeDescription filters in security groups that have IpRange Descriptions that match a loadBalancerName
func filterForIPRangeDescription(securityGroups []*ec2.SecurityGroup, lbName string) []*ec2.SecurityGroup {
response := []*ec2.SecurityGroup{}
clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
for i := range securityGroups {
for j := range securityGroups[i].IpPermissions {
for k := range securityGroups[i].IpPermissions[j].IpRanges {
description := aws.StringValue(securityGroups[i].IpPermissions[j].IpRanges[k].Description)
if description == clientRule || description == healthRule {
response = append(response, securityGroups[i])
}
}
}
}
return response
}
func (c *Cloud) getVpcCidrBlock() (*string, error) {
vpcs, err := c.ec2.DescribeVpcs(&ec2.DescribeVpcsInput{
VpcIds: []*string{aws.String(c.vpcID)},
})
if err != nil {
return nil, fmt.Errorf("Error querying VPC for ELB: %q", err)
}
if len(vpcs.Vpcs) != 1 {
return nil, fmt.Errorf("Error querying VPC for ELB, got %d vpcs for %s", len(vpcs.Vpcs), c.vpcID)
}
return vpcs.Vpcs[0].CidrBlock, nil
}
// abstraction for updating SG rules
// if clientTraffic is false, then only update HealthCheck rules
func (c *Cloud) updateInstanceSecurityGroupsForNLBTraffic(actualGroups []*ec2.SecurityGroup, desiredSgIds []string, ports []int64, lbName string, clientCidrs []string, clientTraffic bool) error {
// Map containing the groups we want to make changes on; the ports to make
// changes on; and whether to add or remove it. true to add, false to remove
portChanges := map[string]map[int64]bool{}
for _, id := range desiredSgIds {
// consider everything an addition for now
if _, ok := portChanges[id]; !ok {
portChanges[id] = make(map[int64]bool)
}
for _, port := range ports {
portChanges[id][port] = true
}
}
// Compare to actual groups
for _, actualGroup := range actualGroups {
actualGroupID := aws.StringValue(actualGroup.GroupId)
if actualGroupID == "" {
glog.Warning("Ignoring group without ID: ", actualGroup)
continue
}
addingMap, ok := portChanges[actualGroupID]
if ok {
desiredSet := sets.NewInt64()
for port := range addingMap {
desiredSet.Insert(port)
}
existingSet := portsForNLB(lbName, actualGroup, clientTraffic)
// remove from portChanges ports that are already allowed
if intersection := desiredSet.Intersection(existingSet); intersection.Len() > 0 {
for p := range intersection {
delete(portChanges[actualGroupID], p)
}
}
// allowed ports that need to be removed
if difference := existingSet.Difference(desiredSet); difference.Len() > 0 {
for p := range difference {
portChanges[actualGroupID][p] = false
}
}
}
}
// Make changes we've planned on
for instanceSecurityGroupID, portMap := range portChanges {
adds := []*ec2.IpPermission{}
removes := []*ec2.IpPermission{}
for port, add := range portMap {
if add {
if clientTraffic {
glog.V(2).Infof("Adding rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
glog.V(2).Infof("Adding rule for client traffic from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
} else {
glog.V(2).Infof("Adding rule for health check traffic from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
}
} else {
if clientTraffic {
glog.V(2).Infof("Removing rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
glog.V(2).Infof("Removing rule for client traffic from the network load balancer (%s) to instance (%s)", clientCidrs, instanceSecurityGroupID)
}
glog.V(2).Infof("Removing rule for health check traffic from the network load balancer (%s) to instance (%s)", clientCidrs, instanceSecurityGroupID)
}
if clientTraffic {
clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
mtuRuleAnnotation := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, lbName)
// Client Traffic
permission := &ec2.IpPermission{
FromPort: aws.Int64(port),
ToPort: aws.Int64(port),
IpProtocol: aws.String("tcp"),
}
ranges := []*ec2.IpRange{}
for _, cidr := range clientCidrs {
ranges = append(ranges, &ec2.IpRange{
CidrIp: aws.String(cidr),
Description: aws.String(clientRuleAnnotation),
})
}
permission.IpRanges = ranges
if add {
adds = append(adds, permission)
} else {
removes = append(removes, permission)
}
// MTU discovery
permission = &ec2.IpPermission{
IpProtocol: aws.String("icmp"),
FromPort: aws.Int64(3),
ToPort: aws.Int64(4),
}
ranges = []*ec2.IpRange{}
for _, cidr := range clientCidrs {
ranges = append(ranges, &ec2.IpRange{
CidrIp: aws.String(cidr),
Description: aws.String(mtuRuleAnnotation),
})
}
permission.IpRanges = ranges
if add {
adds = append(adds, permission)
} else {
removes = append(removes, permission)
}
} else {
healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
// NLB HealthCheck
permission := &ec2.IpPermission{
FromPort: aws.Int64(port),
ToPort: aws.Int64(port),
IpProtocol: aws.String("tcp"),
}
ranges := []*ec2.IpRange{}
for _, cidr := range clientCidrs {
ranges = append(ranges, &ec2.IpRange{
CidrIp: aws.String(cidr),
Description: aws.String(healthRuleAnnotation),
})
}
permission.IpRanges = ranges
if add {
adds = append(adds, permission)
} else {
removes = append(removes, permission)
}
}
}
if len(adds) > 0 {
changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, adds)
if err != nil {
return err
}
if !changed {
glog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
}
}
if len(removes) > 0 {
changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, removes)
if err != nil {
return err
}
if !changed {
glog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
}
}
}
return nil
}
// Add SG rules for a given NLB
func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, instances map[awsInstanceID]*ec2.Instance, lbName string, clientCidrs []string) error {
if c.cfg.Global.DisableSecurityGroupIngress {
return nil
}
vpcCidr, err := c.getVpcCidrBlock()
if err != nil {
return err
}
// Unlike the classic ELB, NLB does not have a security group that we can
// filter against all existing groups to see if they allow access. Instead
// we use the IpRange.Description field to annotate NLB health check and
// client traffic rules
// Get the actual list of groups that allow ingress for the load-balancer
var actualGroups []*ec2.SecurityGroup
{
// Server side filter
describeRequest := &ec2.DescribeSecurityGroupsInput{}
filters := []*ec2.Filter{
newEc2Filter("ip-permission.protocol", "tcp"),
}
describeRequest.Filters = c.tagging.addFilters(filters)
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
if err != nil {
return fmt.Errorf("Error querying security groups for NLB: %q", err)
}
for _, sg := range response {
if !c.tagging.hasClusterTag(sg.Tags) {
continue
}
actualGroups = append(actualGroups, sg)
}
// client-side filter
// Filter out groups that don't have IP Rules we've annotated for this service
actualGroups = filterForIPRangeDescription(actualGroups, lbName)
}
taggedSecurityGroups, err := c.getTaggedSecurityGroups()
if err != nil {
return fmt.Errorf("Error querying for tagged security groups: %q", err)
}
externalTrafficPolicyIsLocal := false
trafficPorts := []int64{}
for i := range mappings {
trafficPorts = append(trafficPorts, mappings[i].TrafficPort)
if mappings[i].TrafficPort != mappings[i].HealthCheckPort {
externalTrafficPolicyIsLocal = true
}
}
healthCheckPorts := trafficPorts
// if externalTrafficPolicy is Local, all listeners use the same health
// check port
if externalTrafficPolicyIsLocal && len(mappings) > 0 {
healthCheckPorts = []int64{mappings[0].HealthCheckPort}
}
desiredGroupIds := []string{}
// Scan instances for groups we want open
for _, instance := range instances {
securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups)
if err != nil {
return err
}
if securityGroup == nil {
glog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId))
continue
}
id := aws.StringValue(securityGroup.GroupId)
if id == "" {
glog.Warningf("found security group without id: %v", securityGroup)
continue
}
desiredGroupIds = append(desiredGroupIds, id)
}
// Run once for Client traffic
err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, trafficPorts, lbName, clientCidrs, true)
if err != nil {
return err
}
// Run once for health check traffic
err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, healthCheckPorts, lbName, []string{aws.StringValue(vpcCidr)}, false)
if err != nil {
return err
}
return nil
}
func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool, loadBalancerAttributes *elb.LoadBalancerAttributes, annotations map[string]string) (*elb.LoadBalancerDescription, error) {
loadBalancer, err := c.describeLoadBalancer(loadBalancerName)
if err != nil {
@ -371,6 +1144,17 @@ func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBala
return loadBalancer, nil
}
func createSubnetMappings(subnetIDs []string) []*elbv2.SubnetMapping {
response := []*elbv2.SubnetMapping{}
for _, id := range subnetIDs {
// Ignore AllocationId for now
response = append(response, &elbv2.SubnetMapping{SubnetId: aws.String(id)})
}
return response
}
// elbProtocolsAreEqual checks if two ELB protocol strings are considered the same
// Comparison is case insensitive
func elbProtocolsAreEqual(l, r *string) bool {

View File

@ -125,3 +125,37 @@ func TestAWSARNEquals(t *testing.T) {
}
}
}
func TestIsNLB(t *testing.T) {
tests := []struct {
name string
annotations map[string]string
want bool
}{
{
"NLB annotation provided",
map[string]string{"service.beta.kubernetes.io/aws-load-balancer-type": "nlb"},
true,
},
{
"NLB annotation has invalid value",
map[string]string{"service.beta.kubernetes.io/aws-load-balancer-type": "elb"},
false,
},
{
"NLB annotation absent",
map[string]string{},
false,
},
}
for _, test := range tests {
t.Logf("Running test case %s", test.name)
got := isNLB(test.annotations)
if got != test.want {
t.Errorf("Incorrect value for isNLB() case %s. Got %t, expected %t.", test.name, got, test.want)
}
}
}