diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index dc96a9a5047..b29199072a3 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -38,6 +38,7 @@ import ( "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/elb" + "github.com/aws/aws-sdk-go/service/elbv2" "github.com/aws/aws-sdk-go/service/kms" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" @@ -61,6 +62,18 @@ import ( volumeutil "k8s.io/kubernetes/pkg/volume/util" ) +// NLBHealthCheckRuleDescription is the comment used on a security group rule to +// indicate that it is used for health checks +const NLBHealthCheckRuleDescription = "kubernetes.io/rule/nlb/health" + +// NLBClientRuleDescription is the comment used on a security group rule to +// indicate that it is used for client traffic +const NLBClientRuleDescription = "kubernetes.io/rule/nlb/client" + +// NLBMtuDiscoveryRuleDescription is the comment used on a security group rule +// to indicate that it is used for mtu discovery +const NLBMtuDiscoveryRuleDescription = "kubernetes.io/rule/nlb/mtu" + // ProviderName is the name of this cloud provider. const ProviderName = "aws" @@ -76,6 +89,11 @@ const TagNameSubnetInternalELB = "kubernetes.io/role/internal-elb" // it should be used for internet ELBs const TagNameSubnetPublicELB = "kubernetes.io/role/elb" +// ServiceAnnotationLoadBalancerType is the annotation used on the service +// to indicate what type of Load Balancer we want. Right now, the only accepted +// value is "nlb" +const ServiceAnnotationLoadBalancerType = "service.beta.kubernetes.io/aws-load-balancer-type" + // ServiceAnnotationLoadBalancerInternal is the annotation used on the service // to indicate that we want an internal ELB. const ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/aws-load-balancer-internal" @@ -224,6 +242,7 @@ var _ cloudprovider.PVLabeler = (*Cloud)(nil) type Services interface { Compute(region string) (EC2, error) LoadBalancing(region string) (ELB, error) + LoadBalancingV2(region string) (ELBV2, error) Autoscaling(region string) (ASG, error) Metadata() (EC2Metadata, error) KeyManagement(region string) (KMS, error) @@ -264,6 +283,8 @@ type EC2 interface { DeleteRoute(request *ec2.DeleteRouteInput) (*ec2.DeleteRouteOutput, error) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeInput) (*ec2.ModifyInstanceAttributeOutput, error) + + DescribeVpcs(input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) } // ELB is a simple pass-through of AWS' ELB client interface, which allows for testing @@ -293,6 +314,38 @@ type ELB interface { ModifyLoadBalancerAttributes(*elb.ModifyLoadBalancerAttributesInput) (*elb.ModifyLoadBalancerAttributesOutput, error) } +// ELBV2 is a simple pass-through of AWS' ELBV2 client interface, which allows for testing +type ELBV2 interface { + AddTags(input *elbv2.AddTagsInput) (*elbv2.AddTagsOutput, error) + + CreateLoadBalancer(*elbv2.CreateLoadBalancerInput) (*elbv2.CreateLoadBalancerOutput, error) + DescribeLoadBalancers(*elbv2.DescribeLoadBalancersInput) (*elbv2.DescribeLoadBalancersOutput, error) + DeleteLoadBalancer(*elbv2.DeleteLoadBalancerInput) (*elbv2.DeleteLoadBalancerOutput, error) + + ModifyLoadBalancerAttributes(*elbv2.ModifyLoadBalancerAttributesInput) (*elbv2.ModifyLoadBalancerAttributesOutput, error) + DescribeLoadBalancerAttributes(*elbv2.DescribeLoadBalancerAttributesInput) (*elbv2.DescribeLoadBalancerAttributesOutput, error) + + CreateTargetGroup(*elbv2.CreateTargetGroupInput) (*elbv2.CreateTargetGroupOutput, error) + DescribeTargetGroups(*elbv2.DescribeTargetGroupsInput) (*elbv2.DescribeTargetGroupsOutput, error) + ModifyTargetGroup(*elbv2.ModifyTargetGroupInput) (*elbv2.ModifyTargetGroupOutput, error) + DeleteTargetGroup(*elbv2.DeleteTargetGroupInput) (*elbv2.DeleteTargetGroupOutput, error) + + DescribeTargetHealth(input *elbv2.DescribeTargetHealthInput) (*elbv2.DescribeTargetHealthOutput, error) + + DescribeTargetGroupAttributes(*elbv2.DescribeTargetGroupAttributesInput) (*elbv2.DescribeTargetGroupAttributesOutput, error) + ModifyTargetGroupAttributes(*elbv2.ModifyTargetGroupAttributesInput) (*elbv2.ModifyTargetGroupAttributesOutput, error) + + RegisterTargets(*elbv2.RegisterTargetsInput) (*elbv2.RegisterTargetsOutput, error) + DeregisterTargets(*elbv2.DeregisterTargetsInput) (*elbv2.DeregisterTargetsOutput, error) + + CreateListener(*elbv2.CreateListenerInput) (*elbv2.CreateListenerOutput, error) + DescribeListeners(*elbv2.DescribeListenersInput) (*elbv2.DescribeListenersOutput, error) + DeleteListener(*elbv2.DeleteListenerInput) (*elbv2.DeleteListenerOutput, error) + ModifyListener(*elbv2.ModifyListenerInput) (*elbv2.ModifyListenerOutput, error) + + WaitUntilLoadBalancersDeleted(*elbv2.DescribeLoadBalancersInput) error +} + // ASG is a simple pass-through of the Autoscaling client interface, which // allows for testing. type ASG interface { @@ -402,6 +455,7 @@ type InstanceGroupInfo interface { type Cloud struct { ec2 EC2 elb ELB + elbv2 ELBV2 asg ASG kms KMS metadata EC2Metadata @@ -587,6 +641,20 @@ func (p *awsSDKProvider) LoadBalancing(regionName string) (ELB, error) { return elbClient, nil } +func (p *awsSDKProvider) LoadBalancingV2(regionName string) (ELBV2, error) { + awsConfig := &aws.Config{ + Region: ®ionName, + Credentials: p.creds, + } + awsConfig = awsConfig.WithCredentialsChainVerboseErrors(true) + + elbClient := elbv2.New(session.New(awsConfig)) + + p.addHandlers(regionName, &elbClient.Handlers) + + return elbClient, nil +} + func (p *awsSDKProvider) Autoscaling(regionName string) (ASG, error) { awsConfig := &aws.Config{ Region: ®ionName, @@ -800,6 +868,10 @@ func (s *awsSdkEC2) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttribute return s.ec2.ModifyInstanceAttribute(request) } +func (s *awsSdkEC2) DescribeVpcs(request *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) { + return s.ec2.DescribeVpcs(request) +} + func init() { registerMetrics() cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { @@ -913,6 +985,11 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { return nil, fmt.Errorf("error creating AWS ELB client: %v", err) } + elbv2, err := awsServices.LoadBalancingV2(regionName) + if err != nil { + return nil, fmt.Errorf("error creating AWS ELBV2 client: %v", err) + } + asg, err := awsServices.Autoscaling(regionName) if err != nil { return nil, fmt.Errorf("error creating AWS autoscaling client: %v", err) @@ -926,6 +1003,7 @@ func newAWSCloud(config io.Reader, awsServices Services) (*Cloud, error) { awsCloud := &Cloud{ ec2: ec2, elb: elb, + elbv2: elbv2, asg: asg, metadata: metadata, kms: kms, @@ -2270,6 +2348,32 @@ func (c *Cloud) addLoadBalancerTags(loadBalancerName string, requested map[strin return nil } +// Gets the current load balancer state +func (c *Cloud) describeLoadBalancerv2(name string) (*elbv2.LoadBalancer, error) { + request := &elbv2.DescribeLoadBalancersInput{ + Names: []*string{aws.String(name)}, + } + + response, err := c.elbv2.DescribeLoadBalancers(request) + if err != nil { + if awsError, ok := err.(awserr.Error); ok { + if awsError.Code() == elbv2.ErrCodeLoadBalancerNotFoundException { + return nil, nil + } + } + return nil, fmt.Errorf("Error describing load balancer: %q", err) + } + + // AWS will not return 2 load balancers with the same name _and_ type. + for i := range response.LoadBalancers { + if aws.StringValue(response.LoadBalancers[i].Type) == elbv2.LoadBalancerTypeEnumNetwork { + return response.LoadBalancers[i], nil + } + } + + return nil, fmt.Errorf("NLB '%s' could not be found", name) +} + // Retrieves instance's vpc id from metadata func (c *Cloud) findVPCID() (string, error) { macs, err := c.metadata.GetMetadata("network/interfaces/macs/") @@ -2959,6 +3063,8 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n // Figure out what mappings we want on the load balancer listeners := []*elb.Listener{} + v2Mappings := []nlbPortMapping{} + portList := getPortSets(annotations[ServiceAnnotationLoadBalancerSSLPorts]) for _, port := range apiService.Spec.Ports { if port.Protocol != v1.ProtocolTCP { @@ -2968,6 +3074,17 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n glog.Errorf("Ignoring port without NodePort defined: %v", port) continue } + + if isNLB(annotations) { + v2Mappings = append(v2Mappings, nlbPortMapping{ + FrontendPort: int64(port.Port), + TrafficPort: int64(port.NodePort), + // if externalTrafficPolicy == "Local", we'll override the + // health check later + HealthCheckPort: int64(port.NodePort), + HealthCheckProtocol: elbv2.ProtocolEnumTcp, + }) + } listener, err := buildListener(port, annotations, portList) if err != nil { return nil, err @@ -2996,6 +3113,69 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n internalELB = true } + if isNLB(annotations) { + + if path, healthCheckNodePort := service.GetServiceHealthCheckPathPort(apiService); path != "" { + for i := range v2Mappings { + v2Mappings[i].HealthCheckPort = int64(healthCheckNodePort) + v2Mappings[i].HealthCheckPath = path + v2Mappings[i].HealthCheckProtocol = elbv2.ProtocolEnumHttp + } + } + + // Find the subnets that the ELB will live in + subnetIDs, err := c.findELBSubnets(internalELB) + if err != nil { + glog.Errorf("Error listing subnets in VPC: %q", err) + return nil, err + } + // Bail out early if there are no subnets + if len(subnetIDs) == 0 { + return nil, fmt.Errorf("could not find any suitable subnets for creating the ELB") + } + + loadBalancerName := cloudprovider.GetLoadBalancerName(apiService) + serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name} + + instanceIDs := []string{} + for id := range instances { + instanceIDs = append(instanceIDs, string(id)) + } + + v2LoadBalancer, err := c.ensureLoadBalancerv2( + serviceName, + loadBalancerName, + v2Mappings, + instanceIDs, + subnetIDs, + internalELB, + annotations, + ) + if err != nil { + return nil, err + } + + sourceRangeCidrs := []string{} + for cidr := range sourceRanges { + sourceRangeCidrs = append(sourceRangeCidrs, cidr) + } + if len(sourceRangeCidrs) == 0 { + sourceRangeCidrs = append(sourceRangeCidrs, "0.0.0.0/0") + } + + err = c.updateInstanceSecurityGroupsForNLB(v2Mappings, instances, loadBalancerName, sourceRangeCidrs) + if err != nil { + glog.Warningf("Error opening ingress rules for the load balancer to the instances: %q", err) + return nil, err + } + + // We don't have an `ensureLoadBalancerInstances()` function for elbv2 + // because `ensureLoadBalancerv2()` requires instance Ids + + // TODO: Wait for creation? + return v2toStatus(v2LoadBalancer), nil + } + // Determine if we need to set the Proxy protocol policy proxyProtocol := false proxyProtocolAnnotation := apiService.Annotations[ServiceAnnotationLoadBalancerProxyProtocol] @@ -3240,6 +3420,18 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n // GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer func (c *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) { loadBalancerName := cloudprovider.GetLoadBalancerName(service) + + if isNLB(service.Annotations) { + lb, err := c.describeLoadBalancerv2(loadBalancerName) + if err != nil { + return nil, false, err + } + if lb == nil { + return nil, false, nil + } + return v2toStatus(lb), true, nil + } + lb, err := c.describeLoadBalancer(loadBalancerName) if err != nil { return nil, false, err @@ -3265,6 +3457,24 @@ func toStatus(lb *elb.LoadBalancerDescription) *v1.LoadBalancerStatus { return status } +func v2toStatus(lb *elbv2.LoadBalancer) *v1.LoadBalancerStatus { + status := &v1.LoadBalancerStatus{} + if lb == nil { + glog.Error("[BUG] v2toStatus got nil input, this is a Kubernetes bug, please report") + return status + } + + // We check for Active or Provisioning, the only successful statuses + if aws.StringValue(lb.DNSName) != "" && (aws.StringValue(lb.State.Code) == elbv2.LoadBalancerStateEnumActive || + aws.StringValue(lb.State.Code) == elbv2.LoadBalancerStateEnumProvisioning) { + var ingress v1.LoadBalancerIngress + ingress.Hostname = aws.StringValue(lb.DNSName) + status.Ingress = []v1.LoadBalancerIngress{ingress} + } + + return status +} + // Returns the first security group for an instance, or nil // We only create instances with one security group, so we don't expect multiple security groups. // However, if there are multiple security groups, we will choose the one tagged with our cluster filter. @@ -3470,6 +3680,139 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer // EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted. func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error { loadBalancerName := cloudprovider.GetLoadBalancerName(service) + + if isNLB(service.Annotations) { + lb, err := c.describeLoadBalancerv2(loadBalancerName) + if err != nil { + return err + } + if lb == nil { + glog.Info("Load balancer already deleted: ", loadBalancerName) + return nil + } + + // Delete the LoadBalancer and target groups + // + // Deleting a target group while associated with a load balancer will + // fail. We delete the loadbalancer first. This does leave the + // possibility of zombie target groups if DeleteLoadBalancer() fails + // + // * Get target groups for NLB + // * Delete Load Balancer + // * Delete target groups + // * Clean up SecurityGroupRules + { + + targetGroups, err := c.elbv2.DescribeTargetGroups( + &elbv2.DescribeTargetGroupsInput{LoadBalancerArn: lb.LoadBalancerArn}, + ) + if err != nil { + return fmt.Errorf("Error listing target groups before deleting load balancer: %q", err) + } + + _, err = c.elbv2.DeleteLoadBalancer( + &elbv2.DeleteLoadBalancerInput{LoadBalancerArn: lb.LoadBalancerArn}, + ) + if err != nil { + return fmt.Errorf("Error deleting load balancer %q: %v", loadBalancerName, err) + } + + for _, group := range targetGroups.TargetGroups { + _, err := c.elbv2.DeleteTargetGroup( + &elbv2.DeleteTargetGroupInput{TargetGroupArn: group.TargetGroupArn}, + ) + if err != nil { + return fmt.Errorf("Error deleting target groups after deleting load balancer: %q", err) + } + } + } + + { + var matchingGroups []*ec2.SecurityGroup + { + // Server side filter + describeRequest := &ec2.DescribeSecurityGroupsInput{} + filters := []*ec2.Filter{ + newEc2Filter("ip-permission.protocol", "tcp"), + } + describeRequest.Filters = c.tagging.addFilters(filters) + response, err := c.ec2.DescribeSecurityGroups(describeRequest) + if err != nil { + return fmt.Errorf("Error querying security groups for NLB: %q", err) + } + for _, sg := range response { + if !c.tagging.hasClusterTag(sg.Tags) { + continue + } + matchingGroups = append(matchingGroups, sg) + } + + // client-side filter out groups that don't have IP Rules we've + // annotated for this service + matchingGroups = filterForIPRangeDescription(matchingGroups, loadBalancerName) + } + + { + clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, loadBalancerName) + mtuRule := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, loadBalancerName) + healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, loadBalancerName) + + for i := range matchingGroups { + removes := []*ec2.IpPermission{} + for j := range matchingGroups[i].IpPermissions { + + v4rangesToRemove := []*ec2.IpRange{} + v6rangesToRemove := []*ec2.Ipv6Range{} + + // Find IpPermission that contains k8s description + // If we removed the whole IpPermission, it could contain other non-k8s specified ranges + for k := range matchingGroups[i].IpPermissions[j].IpRanges { + description := aws.StringValue(matchingGroups[i].IpPermissions[j].IpRanges[k].Description) + if description == clientRule || description == mtuRule || description == healthRule { + v4rangesToRemove = append(v4rangesToRemove, matchingGroups[i].IpPermissions[j].IpRanges[k]) + } + } + + // Find IpPermission that contains k8s description + // If we removed the whole IpPermission, it could contain other non-k8s specified rangesk + for k := range matchingGroups[i].IpPermissions[j].Ipv6Ranges { + description := aws.StringValue(matchingGroups[i].IpPermissions[j].Ipv6Ranges[k].Description) + if description == clientRule || description == mtuRule || description == healthRule { + v6rangesToRemove = append(v6rangesToRemove, matchingGroups[i].IpPermissions[j].Ipv6Ranges[k]) + } + } + + if len(v4rangesToRemove) > 0 || len(v6rangesToRemove) > 0 { + // create a new *IpPermission to not accidentally remove UserIdGroupPairs + removedPermission := &ec2.IpPermission{ + FromPort: matchingGroups[i].IpPermissions[j].FromPort, + IpProtocol: matchingGroups[i].IpPermissions[j].IpProtocol, + IpRanges: v4rangesToRemove, + Ipv6Ranges: v6rangesToRemove, + ToPort: matchingGroups[i].IpPermissions[j].ToPort, + } + removes = append(removes, removedPermission) + } + + } + if len(removes) > 0 { + changed, err := c.removeSecurityGroupIngress(aws.StringValue(matchingGroups[i].GroupId), removes) + if err != nil { + return err + } + if !changed { + glog.Warning("Revoking ingress was not needed; concurrent change? groupId=", *matchingGroups[i].GroupId) + } + } + + } + + } + + } + return nil + } + lb, err := c.describeLoadBalancer(loadBalancerName) if err != nil { return err @@ -3575,6 +3918,17 @@ func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, node } loadBalancerName := cloudprovider.GetLoadBalancerName(service) + if isNLB(service.Annotations) { + lb, err := c.describeLoadBalancerv2(loadBalancerName) + if err != nil { + return err + } + if lb == nil { + return fmt.Errorf("Load balancer not found") + } + _, err = c.EnsureLoadBalancer(clusterName, service, nodes) + return err + } lb, err := c.describeLoadBalancer(loadBalancerName) if err != nil { return err diff --git a/pkg/cloudprovider/providers/aws/aws_fakes.go b/pkg/cloudprovider/providers/aws/aws_fakes.go index 03996064cc2..036968673e7 100644 --- a/pkg/cloudprovider/providers/aws/aws_fakes.go +++ b/pkg/cloudprovider/providers/aws/aws_fakes.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-sdk-go/service/autoscaling" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/elb" + "github.com/aws/aws-sdk-go/service/elbv2" "github.com/aws/aws-sdk-go/service/kms" "github.com/golang/glog" @@ -38,6 +39,7 @@ type FakeAWSServices struct { ec2 FakeEC2 elb ELB + elbv2 ELBV2 asg *FakeASG metadata *FakeMetadata kms *FakeKMS @@ -48,6 +50,7 @@ func NewFakeAWSServices(clusterId string) *FakeAWSServices { s.region = "us-east-1" s.ec2 = &FakeEC2Impl{aws: s} s.elb = &FakeELB{aws: s} + s.elbv2 = &FakeELBV2{aws: s} s.asg = &FakeASG{aws: s} s.metadata = &FakeMetadata{aws: s} s.kms = &FakeKMS{aws: s} @@ -90,6 +93,10 @@ func (s *FakeAWSServices) LoadBalancing(region string) (ELB, error) { return s.elb, nil } +func (s *FakeAWSServices) LoadBalancingV2(region string) (ELBV2, error) { + return s.elbv2, nil +} + func (s *FakeAWSServices) Autoscaling(region string) (ASG, error) { return s.asg, nil } @@ -246,6 +253,10 @@ func (ec2i *FakeEC2Impl) ModifyInstanceAttribute(request *ec2.ModifyInstanceAttr panic("Not implemented") } +func (ec2i *FakeEC2Impl) DescribeVpcs(request *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) { + return &ec2.DescribeVpcsOutput{Vpcs: []*ec2.Vpc{{CidrBlock: aws.String("172.20.0.0/16")}}}, nil +} + type FakeMetadata struct { aws *FakeAWSServices } @@ -376,6 +387,79 @@ func (self *FakeELB) expectDescribeLoadBalancers(loadBalancerName string) { panic("Not implemented") } +type FakeELBV2 struct { + aws *FakeAWSServices +} + +func (self *FakeELBV2) AddTags(input *elbv2.AddTagsInput) (*elbv2.AddTagsOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) CreateLoadBalancer(*elbv2.CreateLoadBalancerInput) (*elbv2.CreateLoadBalancerOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DescribeLoadBalancers(*elbv2.DescribeLoadBalancersInput) (*elbv2.DescribeLoadBalancersOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DeleteLoadBalancer(*elbv2.DeleteLoadBalancerInput) (*elbv2.DeleteLoadBalancerOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) ModifyLoadBalancerAttributes(*elbv2.ModifyLoadBalancerAttributesInput) (*elbv2.ModifyLoadBalancerAttributesOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DescribeLoadBalancerAttributes(*elbv2.DescribeLoadBalancerAttributesInput) (*elbv2.DescribeLoadBalancerAttributesOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) CreateTargetGroup(*elbv2.CreateTargetGroupInput) (*elbv2.CreateTargetGroupOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DescribeTargetGroups(*elbv2.DescribeTargetGroupsInput) (*elbv2.DescribeTargetGroupsOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) ModifyTargetGroup(*elbv2.ModifyTargetGroupInput) (*elbv2.ModifyTargetGroupOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DeleteTargetGroup(*elbv2.DeleteTargetGroupInput) (*elbv2.DeleteTargetGroupOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) DescribeTargetHealth(input *elbv2.DescribeTargetHealthInput) (*elbv2.DescribeTargetHealthOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) DescribeTargetGroupAttributes(*elbv2.DescribeTargetGroupAttributesInput) (*elbv2.DescribeTargetGroupAttributesOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) ModifyTargetGroupAttributes(*elbv2.ModifyTargetGroupAttributesInput) (*elbv2.ModifyTargetGroupAttributesOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) RegisterTargets(*elbv2.RegisterTargetsInput) (*elbv2.RegisterTargetsOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DeregisterTargets(*elbv2.DeregisterTargetsInput) (*elbv2.DeregisterTargetsOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) CreateListener(*elbv2.CreateListenerInput) (*elbv2.CreateListenerOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DescribeListeners(*elbv2.DescribeListenersInput) (*elbv2.DescribeListenersOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) DeleteListener(*elbv2.DeleteListenerInput) (*elbv2.DeleteListenerOutput, error) { + panic("Not implemented") +} +func (self *FakeELBV2) ModifyListener(*elbv2.ModifyListenerInput) (*elbv2.ModifyListenerOutput, error) { + panic("Not implemented") +} + +func (self *FakeELBV2) WaitUntilLoadBalancersDeleted(*elbv2.DescribeLoadBalancersInput) error { + panic("Not implemented") +} + type FakeASG struct { aws *FakeAWSServices } diff --git a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go index 9584439b572..d494a0156f4 100644 --- a/pkg/cloudprovider/providers/aws/aws_loadbalancer.go +++ b/pkg/cloudprovider/providers/aws/aws_loadbalancer.go @@ -17,6 +17,7 @@ limitations under the License. package aws import ( + "crypto/sha1" "fmt" "reflect" "strconv" @@ -26,6 +27,7 @@ import ( "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/ec2" "github.com/aws/aws-sdk-go/service/elb" + "github.com/aws/aws-sdk-go/service/elbv2" "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" @@ -36,6 +38,23 @@ const ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled" const SSLNegotiationPolicyNameFormat = "k8s-SSLNegotiationPolicy-%s" +func isNLB(annotations map[string]string) bool { + if annotations[ServiceAnnotationLoadBalancerType] == "nlb" { + return true + } + return false +} + +type nlbPortMapping struct { + FrontendPort int64 + TrafficPort int64 + ClientCIDR string + + HealthCheckPort int64 + HealthCheckPath string + HealthCheckProtocol string +} + // getLoadBalancerAdditionalTags converts the comma separated list of key-value // pairs in the ServiceAnnotationLoadBalancerAdditionalTags annotation and returns // it as a map. @@ -65,6 +84,760 @@ func getLoadBalancerAdditionalTags(annotations map[string]string) map[string]str return additionalTags } +// ensureLoadBalancerv2 ensures a v2 load balancer is created +func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBalancerName string, mappings []nlbPortMapping, instanceIDs, subnetIDs []string, internalELB bool, annotations map[string]string) (*elbv2.LoadBalancer, error) { + loadBalancer, err := c.describeLoadBalancerv2(loadBalancerName) + if err != nil { + return nil, err + } + + dirty := false + + // Get additional tags set by the user + tags := getLoadBalancerAdditionalTags(annotations) + // Add default tags + tags[TagNameKubernetesService] = namespacedName.String() + tags = c.tagging.buildTags(ResourceLifecycleOwned, tags) + + if loadBalancer == nil { + // Create the LB + createRequest := &elbv2.CreateLoadBalancerInput{ + Type: aws.String(elbv2.LoadBalancerTypeEnumNetwork), + Name: aws.String(loadBalancerName), + } + if internalELB { + createRequest.Scheme = aws.String("internal") + } + + // We are supposed to specify one subnet per AZ. + // TODO: What happens if we have more than one subnet per AZ? + createRequest.SubnetMappings = createSubnetMappings(subnetIDs) + + for k, v := range tags { + createRequest.Tags = append(createRequest.Tags, &elbv2.Tag{ + Key: aws.String(k), Value: aws.String(v), + }) + } + + glog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName) + createResponse, err := c.elbv2.CreateLoadBalancer(createRequest) + if err != nil { + return nil, fmt.Errorf("Error creating load balancer: %q", err) + } + + loadBalancer = createResponse.LoadBalancers[0] + + // Create Target Groups + addTagsInput := &elbv2.AddTagsInput{ + ResourceArns: []*string{}, + Tags: []*elbv2.Tag{}, + } + + for i := range mappings { + // It is easier to keep track of updates by having possibly + // duplicate target groups where the backend port is the same + _, targetGroupArn, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId) + if err != nil { + return nil, fmt.Errorf("Error creating listener: %q", err) + } + addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn) + + } + + // Add tags to targets + for k, v := range tags { + addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{ + Key: aws.String(k), Value: aws.String(v), + }) + } + if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 { + _, err = c.elbv2.AddTags(addTagsInput) + if err != nil { + return nil, fmt.Errorf("Error adding tags after creating Load Balancer: %q", err) + } + } + } else { + // TODO: Sync internal vs non-internal + + // sync mappings + { + listenerDescriptions, err := c.elbv2.DescribeListeners( + &elbv2.DescribeListenersInput{ + LoadBalancerArn: loadBalancer.LoadBalancerArn, + }, + ) + if err != nil { + return nil, fmt.Errorf("Error describing listeners: %q", err) + } + + // actual maps FrontendPort to an elbv2.Listener + actual := map[int64]*elbv2.Listener{} + for _, listener := range listenerDescriptions.Listeners { + actual[*listener.Port] = listener + } + + actualTargetGroups, err := c.elbv2.DescribeTargetGroups( + &elbv2.DescribeTargetGroupsInput{ + LoadBalancerArn: loadBalancer.LoadBalancerArn, + }, + ) + if err != nil { + return nil, fmt.Errorf("Error listing target groups: %q", err) + } + + nodePortTargetGroup := map[int64]*elbv2.TargetGroup{} + for _, targetGroup := range actualTargetGroups.TargetGroups { + nodePortTargetGroup[*targetGroup.Port] = targetGroup + } + + // Create Target Groups + addTagsInput := &elbv2.AddTagsInput{ + ResourceArns: []*string{}, + Tags: []*elbv2.Tag{}, + } + + // Handle additions/modifications + for _, mapping := range mappings { + frontendPort := mapping.FrontendPort + nodePort := mapping.TrafficPort + + // modifications + if listener, ok := actual[frontendPort]; ok { + // nodePort must have changed, we'll need to delete old TG + // and recreate + if targetGroup, ok := nodePortTargetGroup[nodePort]; !ok { + // Create new Target group + targetName := createTargetName(namespacedName, frontendPort, nodePort) + targetGroup, err = c.ensureTargetGroup( + nil, + mapping, + instanceIDs, + targetName, + *loadBalancer.VpcId, + ) + if err != nil { + return nil, err + } + + // Associate new target group to LB + _, err := c.elbv2.ModifyListener(&elbv2.ModifyListenerInput{ + ListenerArn: listener.ListenerArn, + Port: aws.Int64(frontendPort), + Protocol: aws.String("TCP"), + DefaultActions: []*elbv2.Action{{ + TargetGroupArn: targetGroup.TargetGroupArn, + Type: aws.String("forward"), + }}, + }) + if err != nil { + return nil, fmt.Errorf("Error updating load balancer listener: %q", err) + } + + // Delete old target group + _, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{ + TargetGroupArn: listener.DefaultActions[0].TargetGroupArn, + }) + if err != nil { + return nil, fmt.Errorf("Error deleting old target group: %q", err) + } + + } else { + // Run ensureTargetGroup to make sure instances in service are up-to-date + targetName := createTargetName(namespacedName, frontendPort, nodePort) + _, err = c.ensureTargetGroup( + targetGroup, + mapping, + instanceIDs, + targetName, + *loadBalancer.VpcId, + ) + if err != nil { + return nil, err + } + } + dirty = true + continue + } + + // Additions + _, targetGroupArn, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId) + if err != nil { + return nil, err + } + addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn) + dirty = true + } + + frontEndPorts := map[int64]bool{} + for i := range mappings { + frontEndPorts[mappings[i].FrontendPort] = true + } + + // handle deletions + for port, listener := range actual { + if _, ok := frontEndPorts[port]; !ok { + err := c.deleteListenerV2(listener) + if err != nil { + return nil, err + } + dirty = true + } + } + + // Add tags to new targets + for k, v := range tags { + addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{ + Key: aws.String(k), Value: aws.String(v), + }) + } + if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 { + _, err = c.elbv2.AddTags(addTagsInput) + if err != nil { + return nil, fmt.Errorf("Error adding tags after modifying load balancer targets: %q", err) + } + } + } + + // Subnets cannot be modified on NLBs + if dirty { + loadBalancers, err := c.elbv2.DescribeLoadBalancers( + &elbv2.DescribeLoadBalancersInput{ + LoadBalancerArns: []*string{ + loadBalancer.LoadBalancerArn, + }, + }, + ) + if err != nil { + return nil, fmt.Errorf("Error retrieving load balancer after update: %q", err) + } + loadBalancer = loadBalancers.LoadBalancers[0] + } + } + return loadBalancer, nil +} + +// create a valid target group name - ensure name is not over 32 characters +func createTargetName(namespacedName types.NamespacedName, frontendPort, nodePort int64) string { + sha := fmt.Sprintf("%x", sha1.Sum([]byte(namespacedName.String())))[:13] + return fmt.Sprintf("k8s-tg-%s-%d-%d", sha, frontendPort, nodePort) +} + +func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string) (listener *elbv2.Listener, targetGroupArn *string, err error) { + targetName := createTargetName(namespacedName, mapping.FrontendPort, mapping.TrafficPort) + + glog.Infof("Creating load balancer target group for %v with name: %s", namespacedName, targetName) + target, err := c.ensureTargetGroup( + nil, + mapping, + instanceIDs, + targetName, + vpcID, + ) + if err != nil { + return nil, aws.String(""), err + } + + createListernerInput := &elbv2.CreateListenerInput{ + LoadBalancerArn: loadBalancerArn, + Port: aws.Int64(mapping.FrontendPort), + Protocol: aws.String("TCP"), + DefaultActions: []*elbv2.Action{{ + TargetGroupArn: target.TargetGroupArn, + Type: aws.String(elbv2.ActionTypeEnumForward), + }}, + } + glog.Infof("Creating load balancer listener for %v", namespacedName) + createListenerOutput, err := c.elbv2.CreateListener(createListernerInput) + if err != nil { + return nil, aws.String(""), fmt.Errorf("Error creating load balancer listener: %q", err) + } + return createListenerOutput.Listeners[0], target.TargetGroupArn, nil +} + +// cleans up listener and corresponding target group +func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error { + _, err := c.elbv2.DeleteListener(&elbv2.DeleteListenerInput{ListenerArn: listener.ListenerArn}) + if err != nil { + return fmt.Errorf("Error deleting load balancer listener: %q", err) + } + _, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{TargetGroupArn: listener.DefaultActions[0].TargetGroupArn}) + if err != nil { + return fmt.Errorf("Error deleting load balancer target group: %q", err) + } + return nil +} + +// ensureTargetGroup creates a target group with a set of instances +func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPortMapping, instances []string, name string, vpcID string) (*elbv2.TargetGroup, error) { + dirty := false + if targetGroup == nil { + + input := &elbv2.CreateTargetGroupInput{ + VpcId: aws.String(vpcID), + Name: aws.String(name), + Port: aws.Int64(mapping.TrafficPort), + Protocol: aws.String("TCP"), + TargetType: aws.String("instance"), + HealthCheckIntervalSeconds: aws.Int64(30), + HealthCheckPort: aws.String("traffic-port"), + HealthCheckProtocol: aws.String("TCP"), + HealthyThresholdCount: aws.Int64(3), + UnhealthyThresholdCount: aws.Int64(3), + } + + input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol) + if mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp { + input.HealthCheckPath = aws.String(mapping.HealthCheckPath) + } + + // Account for externalTrafficPolicy = "Local" + if mapping.HealthCheckPort != mapping.TrafficPort { + input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort))) + } + + result, err := c.elbv2.CreateTargetGroup(input) + if err != nil { + return nil, fmt.Errorf("Error creating load balancer target group: %q", err) + } + if len(result.TargetGroups) != 1 { + return nil, fmt.Errorf("Expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups)) + } + + registerInput := &elbv2.RegisterTargetsInput{ + TargetGroupArn: result.TargetGroups[0].TargetGroupArn, + Targets: []*elbv2.TargetDescription{}, + } + for _, instanceID := range instances { + registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{ + Id: aws.String(string(instanceID)), + Port: aws.Int64(mapping.TrafficPort), + }) + } + + _, err = c.elbv2.RegisterTargets(registerInput) + if err != nil { + return nil, fmt.Errorf("Error registering targets for load balancer: %q", err) + } + + return result.TargetGroups[0], nil + } + + // handle instances in service + { + healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn}) + if err != nil { + return nil, fmt.Errorf("Error describing target group health: %q", err) + } + actualIDs := []string{} + for _, healthDescription := range healthResponse.TargetHealthDescriptions { + if healthDescription.TargetHealth.Reason != nil { + switch aws.StringValue(healthDescription.TargetHealth.Reason) { + case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress: + // We don't need to count this instance in service if it is + // on its way out + default: + actualIDs = append(actualIDs, *healthDescription.Target.Id) + } + } + } + + actual := sets.NewString(actualIDs...) + expected := sets.NewString(instances...) + + additions := expected.Difference(actual) + removals := actual.Difference(expected) + + if len(additions) > 0 { + registerInput := &elbv2.RegisterTargetsInput{ + TargetGroupArn: targetGroup.TargetGroupArn, + Targets: []*elbv2.TargetDescription{}, + } + for instanceID := range additions { + registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{ + Id: aws.String(instanceID), + Port: aws.Int64(mapping.TrafficPort), + }) + } + _, err := c.elbv2.RegisterTargets(registerInput) + if err != nil { + return nil, fmt.Errorf("Error registering new targets in target group: %q", err) + } + dirty = true + } + + if len(removals) > 0 { + deregisterInput := &elbv2.DeregisterTargetsInput{ + TargetGroupArn: targetGroup.TargetGroupArn, + Targets: []*elbv2.TargetDescription{}, + } + for instanceID := range removals { + deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{ + Id: aws.String(instanceID), + Port: aws.Int64(mapping.TrafficPort), + }) + } + _, err := c.elbv2.DeregisterTargets(deregisterInput) + if err != nil { + return nil, fmt.Errorf("Error trying to deregister targets in target group: %q", err) + } + dirty = true + } + } + + // ensure the health check is correct + { + dirtyHealthCheck := false + + input := &elbv2.ModifyTargetGroupInput{ + TargetGroupArn: targetGroup.TargetGroupArn, + } + + if aws.StringValue(targetGroup.HealthCheckProtocol) != mapping.HealthCheckProtocol { + input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol) + dirtyHealthCheck = true + } + if aws.StringValue(targetGroup.HealthCheckPort) != strconv.Itoa(int(mapping.HealthCheckPort)) { + input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort))) + dirtyHealthCheck = true + } + if mapping.HealthCheckPath != "" && mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp { + input.HealthCheckPath = aws.String(mapping.HealthCheckPath) + dirtyHealthCheck = true + } + + if dirtyHealthCheck { + _, err := c.elbv2.ModifyTargetGroup(input) + if err != nil { + return nil, fmt.Errorf("Error modifying target group health check: %q", err) + } + + dirty = true + } + } + + if dirty { + result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{ + Names: []*string{aws.String(name)}, + }) + if err != nil { + return nil, fmt.Errorf("Error retrieving target group after creation/update: %q", err) + } + targetGroup = result.TargetGroups[0] + } + + return targetGroup, nil +} + +func portsForNLB(lbName string, sg *ec2.SecurityGroup, clientTraffic bool) sets.Int64 { + response := sets.NewInt64() + var annotation string + if clientTraffic { + annotation = fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) + } else { + annotation = fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) + } + + for i := range sg.IpPermissions { + for j := range sg.IpPermissions[i].IpRanges { + description := aws.StringValue(sg.IpPermissions[i].IpRanges[j].Description) + if description == annotation { + // TODO should probably check FromPort == ToPort + response.Insert(aws.Int64Value(sg.IpPermissions[i].FromPort)) + } + } + } + return response +} + +// filterForIPRangeDescription filters in security groups that have IpRange Descriptions that match a loadBalancerName +func filterForIPRangeDescription(securityGroups []*ec2.SecurityGroup, lbName string) []*ec2.SecurityGroup { + response := []*ec2.SecurityGroup{} + clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) + healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) + for i := range securityGroups { + for j := range securityGroups[i].IpPermissions { + for k := range securityGroups[i].IpPermissions[j].IpRanges { + description := aws.StringValue(securityGroups[i].IpPermissions[j].IpRanges[k].Description) + if description == clientRule || description == healthRule { + response = append(response, securityGroups[i]) + } + } + } + } + return response +} + +func (c *Cloud) getVpcCidrBlock() (*string, error) { + vpcs, err := c.ec2.DescribeVpcs(&ec2.DescribeVpcsInput{ + VpcIds: []*string{aws.String(c.vpcID)}, + }) + if err != nil { + return nil, fmt.Errorf("Error querying VPC for ELB: %q", err) + } + if len(vpcs.Vpcs) != 1 { + return nil, fmt.Errorf("Error querying VPC for ELB, got %d vpcs for %s", len(vpcs.Vpcs), c.vpcID) + } + return vpcs.Vpcs[0].CidrBlock, nil +} + +// abstraction for updating SG rules +// if clientTraffic is false, then only update HealthCheck rules +func (c *Cloud) updateInstanceSecurityGroupsForNLBTraffic(actualGroups []*ec2.SecurityGroup, desiredSgIds []string, ports []int64, lbName string, clientCidrs []string, clientTraffic bool) error { + + // Map containing the groups we want to make changes on; the ports to make + // changes on; and whether to add or remove it. true to add, false to remove + portChanges := map[string]map[int64]bool{} + + for _, id := range desiredSgIds { + // consider everything an addition for now + if _, ok := portChanges[id]; !ok { + portChanges[id] = make(map[int64]bool) + } + for _, port := range ports { + portChanges[id][port] = true + } + } + + // Compare to actual groups + for _, actualGroup := range actualGroups { + actualGroupID := aws.StringValue(actualGroup.GroupId) + if actualGroupID == "" { + glog.Warning("Ignoring group without ID: ", actualGroup) + continue + } + + addingMap, ok := portChanges[actualGroupID] + if ok { + desiredSet := sets.NewInt64() + for port := range addingMap { + desiredSet.Insert(port) + } + existingSet := portsForNLB(lbName, actualGroup, clientTraffic) + + // remove from portChanges ports that are already allowed + if intersection := desiredSet.Intersection(existingSet); intersection.Len() > 0 { + for p := range intersection { + delete(portChanges[actualGroupID], p) + } + } + + // allowed ports that need to be removed + if difference := existingSet.Difference(desiredSet); difference.Len() > 0 { + for p := range difference { + portChanges[actualGroupID][p] = false + } + } + } + } + + // Make changes we've planned on + for instanceSecurityGroupID, portMap := range portChanges { + adds := []*ec2.IpPermission{} + removes := []*ec2.IpPermission{} + for port, add := range portMap { + if add { + if clientTraffic { + glog.V(2).Infof("Adding rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) + glog.V(2).Infof("Adding rule for client traffic from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) + } else { + glog.V(2).Infof("Adding rule for health check traffic from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) + } + } else { + if clientTraffic { + glog.V(2).Infof("Removing rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID) + glog.V(2).Infof("Removing rule for client traffic from the network load balancer (%s) to instance (%s)", clientCidrs, instanceSecurityGroupID) + } + glog.V(2).Infof("Removing rule for health check traffic from the network load balancer (%s) to instance (%s)", clientCidrs, instanceSecurityGroupID) + } + + if clientTraffic { + clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName) + mtuRuleAnnotation := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, lbName) + // Client Traffic + permission := &ec2.IpPermission{ + FromPort: aws.Int64(port), + ToPort: aws.Int64(port), + IpProtocol: aws.String("tcp"), + } + ranges := []*ec2.IpRange{} + for _, cidr := range clientCidrs { + ranges = append(ranges, &ec2.IpRange{ + CidrIp: aws.String(cidr), + Description: aws.String(clientRuleAnnotation), + }) + } + permission.IpRanges = ranges + if add { + adds = append(adds, permission) + } else { + removes = append(removes, permission) + } + + // MTU discovery + permission = &ec2.IpPermission{ + IpProtocol: aws.String("icmp"), + FromPort: aws.Int64(3), + ToPort: aws.Int64(4), + } + ranges = []*ec2.IpRange{} + for _, cidr := range clientCidrs { + ranges = append(ranges, &ec2.IpRange{ + CidrIp: aws.String(cidr), + Description: aws.String(mtuRuleAnnotation), + }) + } + permission.IpRanges = ranges + if add { + adds = append(adds, permission) + } else { + removes = append(removes, permission) + } + } else { + healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName) + + // NLB HealthCheck + permission := &ec2.IpPermission{ + FromPort: aws.Int64(port), + ToPort: aws.Int64(port), + IpProtocol: aws.String("tcp"), + } + ranges := []*ec2.IpRange{} + for _, cidr := range clientCidrs { + ranges = append(ranges, &ec2.IpRange{ + CidrIp: aws.String(cidr), + Description: aws.String(healthRuleAnnotation), + }) + } + permission.IpRanges = ranges + if add { + adds = append(adds, permission) + } else { + removes = append(removes, permission) + } + } + + } + if len(adds) > 0 { + changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, adds) + if err != nil { + return err + } + if !changed { + glog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) + } + } + if len(removes) > 0 { + changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, removes) + if err != nil { + return err + } + if !changed { + glog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID) + } + } + } + return nil +} + +// Add SG rules for a given NLB +func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, instances map[awsInstanceID]*ec2.Instance, lbName string, clientCidrs []string) error { + if c.cfg.Global.DisableSecurityGroupIngress { + return nil + } + + vpcCidr, err := c.getVpcCidrBlock() + if err != nil { + return err + } + + // Unlike the classic ELB, NLB does not have a security group that we can + // filter against all existing groups to see if they allow access. Instead + // we use the IpRange.Description field to annotate NLB health check and + // client traffic rules + + // Get the actual list of groups that allow ingress for the load-balancer + var actualGroups []*ec2.SecurityGroup + { + // Server side filter + describeRequest := &ec2.DescribeSecurityGroupsInput{} + filters := []*ec2.Filter{ + newEc2Filter("ip-permission.protocol", "tcp"), + } + describeRequest.Filters = c.tagging.addFilters(filters) + response, err := c.ec2.DescribeSecurityGroups(describeRequest) + if err != nil { + return fmt.Errorf("Error querying security groups for NLB: %q", err) + } + for _, sg := range response { + if !c.tagging.hasClusterTag(sg.Tags) { + continue + } + actualGroups = append(actualGroups, sg) + } + + // client-side filter + // Filter out groups that don't have IP Rules we've annotated for this service + actualGroups = filterForIPRangeDescription(actualGroups, lbName) + } + + taggedSecurityGroups, err := c.getTaggedSecurityGroups() + if err != nil { + return fmt.Errorf("Error querying for tagged security groups: %q", err) + } + + externalTrafficPolicyIsLocal := false + trafficPorts := []int64{} + for i := range mappings { + trafficPorts = append(trafficPorts, mappings[i].TrafficPort) + if mappings[i].TrafficPort != mappings[i].HealthCheckPort { + externalTrafficPolicyIsLocal = true + } + } + + healthCheckPorts := trafficPorts + // if externalTrafficPolicy is Local, all listeners use the same health + // check port + if externalTrafficPolicyIsLocal && len(mappings) > 0 { + healthCheckPorts = []int64{mappings[0].HealthCheckPort} + } + + desiredGroupIds := []string{} + // Scan instances for groups we want open + for _, instance := range instances { + securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups) + if err != nil { + return err + } + + if securityGroup == nil { + glog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId)) + continue + } + + id := aws.StringValue(securityGroup.GroupId) + if id == "" { + glog.Warningf("found security group without id: %v", securityGroup) + continue + } + + desiredGroupIds = append(desiredGroupIds, id) + } + + // Run once for Client traffic + err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, trafficPorts, lbName, clientCidrs, true) + if err != nil { + return err + } + + // Run once for health check traffic + err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, healthCheckPorts, lbName, []string{aws.StringValue(vpcCidr)}, false) + if err != nil { + return err + } + + return nil +} + func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool, loadBalancerAttributes *elb.LoadBalancerAttributes, annotations map[string]string) (*elb.LoadBalancerDescription, error) { loadBalancer, err := c.describeLoadBalancer(loadBalancerName) if err != nil { @@ -371,6 +1144,17 @@ func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBala return loadBalancer, nil } +func createSubnetMappings(subnetIDs []string) []*elbv2.SubnetMapping { + response := []*elbv2.SubnetMapping{} + + for _, id := range subnetIDs { + // Ignore AllocationId for now + response = append(response, &elbv2.SubnetMapping{SubnetId: aws.String(id)}) + } + + return response +} + // elbProtocolsAreEqual checks if two ELB protocol strings are considered the same // Comparison is case insensitive func elbProtocolsAreEqual(l, r *string) bool { diff --git a/pkg/cloudprovider/providers/aws/aws_loadbalancer_test.go b/pkg/cloudprovider/providers/aws/aws_loadbalancer_test.go index fb1cb12fbdc..6141de5ebbd 100644 --- a/pkg/cloudprovider/providers/aws/aws_loadbalancer_test.go +++ b/pkg/cloudprovider/providers/aws/aws_loadbalancer_test.go @@ -125,3 +125,37 @@ func TestAWSARNEquals(t *testing.T) { } } } + +func TestIsNLB(t *testing.T) { + tests := []struct { + name string + + annotations map[string]string + want bool + }{ + { + "NLB annotation provided", + map[string]string{"service.beta.kubernetes.io/aws-load-balancer-type": "nlb"}, + true, + }, + { + "NLB annotation has invalid value", + map[string]string{"service.beta.kubernetes.io/aws-load-balancer-type": "elb"}, + false, + }, + { + "NLB annotation absent", + map[string]string{}, + false, + }, + } + + for _, test := range tests { + t.Logf("Running test case %s", test.name) + got := isNLB(test.annotations) + + if got != test.want { + t.Errorf("Incorrect value for isNLB() case %s. Got %t, expected %t.", test.name, got, test.want) + } + } +}