add TLS support for NLB

This commit is contained in:
M00nF1sh 2019-03-04 13:35:06 -08:00
parent d0c3b70802
commit 1d6fe8c6c2
2 changed files with 153 additions and 95 deletions

View File

@ -3398,7 +3398,7 @@ func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiS
listeners := []*elb.Listener{} listeners := []*elb.Listener{}
v2Mappings := []nlbPortMapping{} v2Mappings := []nlbPortMapping{}
portList := getPortSets(annotations[ServiceAnnotationLoadBalancerSSLPorts]) sslPorts := getPortSets(annotations[ServiceAnnotationLoadBalancerSSLPorts])
for _, port := range apiService.Spec.Ports { for _, port := range apiService.Spec.Ports {
if port.Protocol != v1.ProtocolTCP { if port.Protocol != v1.ProtocolTCP {
return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB") return nil, fmt.Errorf("Only TCP LoadBalancer is supported for AWS ELB")
@ -3409,16 +3409,32 @@ func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiS
} }
if isNLB(annotations) { if isNLB(annotations) {
v2Mappings = append(v2Mappings, nlbPortMapping{ portMapping := nlbPortMapping{
FrontendPort: int64(port.Port), FrontendPort: int64(port.Port),
TrafficPort: int64(port.NodePort), FrontendProtocol: string(port.Protocol),
TrafficPort: int64(port.NodePort),
TrafficProtocol: string(port.Protocol),
// if externalTrafficPolicy == "Local", we'll override the // if externalTrafficPolicy == "Local", we'll override the
// health check later // health check later
HealthCheckPort: int64(port.NodePort), HealthCheckPort: int64(port.NodePort),
HealthCheckProtocol: elbv2.ProtocolEnumTcp, HealthCheckProtocol: elbv2.ProtocolEnumTcp,
}) }
certificateARN := annotations[ServiceAnnotationLoadBalancerCertificate]
if certificateARN != "" && (sslPorts == nil || sslPorts.numbers.Has(int64(port.Port)) || sslPorts.names.Has(port.Name)) {
portMapping.FrontendProtocol = elbv2.ProtocolEnumTls
portMapping.SSLCertificateARN = certificateARN
portMapping.SSLPolicy = annotations[ServiceAnnotationLoadBalancerSSLNegotiationPolicy]
if backendProtocol := annotations[ServiceAnnotationLoadBalancerBEProtocol]; backendProtocol == "ssl" {
portMapping.TrafficProtocol = elbv2.ProtocolEnumTls
}
}
v2Mappings = append(v2Mappings, portMapping)
} }
listener, err := buildListener(port, annotations, portList) listener, err := buildListener(port, annotations, sslPorts)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -18,8 +18,10 @@ package aws
import ( import (
"crypto/sha1" "crypto/sha1"
"encoding/hex"
"fmt" "fmt"
"reflect" "reflect"
"regexp"
"strconv" "strconv"
"strings" "strings"
@ -61,13 +63,19 @@ func isNLB(annotations map[string]string) bool {
} }
type nlbPortMapping struct { type nlbPortMapping struct {
FrontendPort int64 FrontendPort int64
TrafficPort int64 FrontendProtocol string
ClientCIDR string
TrafficPort int64
TrafficProtocol string
ClientCIDR string
HealthCheckPort int64 HealthCheckPort int64
HealthCheckPath string HealthCheckPath string
HealthCheckProtocol string HealthCheckProtocol string
SSLCertificateARN string
SSLPolicy string
} }
// getLoadBalancerAdditionalTags converts the comma separated list of key-value // getLoadBalancerAdditionalTags converts the comma separated list of key-value
@ -141,40 +149,13 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
} }
loadBalancer = createResponse.LoadBalancers[0] loadBalancer = createResponse.LoadBalancers[0]
// Create Target Groups
resourceArns := make([]*string, 0, len(mappings))
for i := range mappings { for i := range mappings {
// It is easier to keep track of updates by having possibly // It is easier to keep track of updates by having possibly
// duplicate target groups where the backend port is the same // duplicate target groups where the backend port is the same
_, targetGroupArn, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId) _, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId, tags)
if err != nil { if err != nil {
return nil, fmt.Errorf("Error creating listener: %q", err) return nil, fmt.Errorf("Error creating listener: %q", err)
} }
resourceArns = append(resourceArns, targetGroupArn)
}
// Add tags to targets
targetGroupTags := make([]*elbv2.Tag, 0, len(tags))
for k, v := range tags {
targetGroupTags = append(targetGroupTags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
if len(resourceArns) > 0 && len(targetGroupTags) > 0 {
// elbv2.AddTags doesn't allow to tag multiple resources at once
for _, arn := range resourceArns {
_, err = c.elbv2.AddTags(&elbv2.AddTagsInput{
ResourceArns: []*string{arn},
Tags: targetGroupTags,
})
if err != nil {
return nil, fmt.Errorf("Error adding tags after creating Load Balancer: %q", err)
}
}
} }
} else { } else {
// TODO: Sync internal vs non-internal // TODO: Sync internal vs non-internal
@ -210,12 +191,6 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
nodePortTargetGroup[*targetGroup.Port] = targetGroup nodePortTargetGroup[*targetGroup.Port] = targetGroup
} }
// Create Target Groups
addTagsInput := &elbv2.AddTagsInput{
ResourceArns: []*string{},
Tags: []*elbv2.Tag{},
}
// Handle additions/modifications // Handle additions/modifications
for _, mapping := range mappings { for _, mapping := range mappings {
frontendPort := mapping.FrontendPort frontendPort := mapping.FrontendPort
@ -223,53 +198,93 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
// modifications // modifications
if listener, ok := actual[frontendPort]; ok { if listener, ok := actual[frontendPort]; ok {
// nodePort must have changed, we'll need to delete old TG listenerNeedsModification := false
// and recreate
if targetGroup, ok := nodePortTargetGroup[nodePort]; !ok { if aws.StringValue(listener.Protocol) != mapping.FrontendProtocol {
// Create new Target group listenerNeedsModification = true
targetName := createTargetName(namespacedName, frontendPort, nodePort) }
switch mapping.FrontendProtocol {
case elbv2.ProtocolEnumTls:
{
if aws.StringValue(listener.SslPolicy) != mapping.SSLPolicy {
listenerNeedsModification = true
}
if len(listener.Certificates) == 0 || aws.StringValue(listener.Certificates[0].CertificateArn) != mapping.SSLCertificateARN {
listenerNeedsModification = true
}
}
case elbv2.ProtocolEnumTcp:
{
if aws.StringValue(listener.SslPolicy) != "" {
listenerNeedsModification = true
}
if len(listener.Certificates) != 0 {
listenerNeedsModification = true
}
}
}
// recreate targetGroup if trafficPort or protocol changed
targetGroupRecreated := false
targetGroup, ok := nodePortTargetGroup[nodePort]
if !ok || aws.StringValue(targetGroup.Protocol) != mapping.TrafficProtocol {
// create new target group
targetGroup, err = c.ensureTargetGroup( targetGroup, err = c.ensureTargetGroup(
nil, nil,
namespacedName,
mapping, mapping,
instanceIDs, instanceIDs,
targetName,
*loadBalancer.VpcId, *loadBalancer.VpcId,
tags,
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
targetGroupRecreated = true
listenerNeedsModification = true
}
// Associate new target group to LB if listenerNeedsModification {
_, err := c.elbv2.ModifyListener(&elbv2.ModifyListenerInput{ modifyListenerInput := &elbv2.ModifyListenerInput{
ListenerArn: listener.ListenerArn, ListenerArn: listener.ListenerArn,
Port: aws.Int64(frontendPort), Port: aws.Int64(frontendPort),
Protocol: aws.String("TCP"), Protocol: aws.String(mapping.FrontendProtocol),
DefaultActions: []*elbv2.Action{{ DefaultActions: []*elbv2.Action{{
TargetGroupArn: targetGroup.TargetGroupArn, TargetGroupArn: targetGroup.TargetGroupArn,
Type: aws.String("forward"), Type: aws.String("forward"),
}}, }},
}) }
if err != nil { if mapping.FrontendProtocol == elbv2.ProtocolEnumTls {
if mapping.SSLPolicy != "" {
modifyListenerInput.SslPolicy = aws.String(mapping.SSLPolicy)
}
modifyListenerInput.Certificates = []*elbv2.Certificate{
{
CertificateArn: aws.String(mapping.SSLCertificateARN),
},
}
}
if _, err := c.elbv2.ModifyListener(modifyListenerInput); err != nil {
return nil, fmt.Errorf("Error updating load balancer listener: %q", err) return nil, fmt.Errorf("Error updating load balancer listener: %q", err)
} }
}
// Delete old target group // Delete old targetGroup if needed
_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{ if targetGroupRecreated {
if _, err := c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{
TargetGroupArn: listener.DefaultActions[0].TargetGroupArn, TargetGroupArn: listener.DefaultActions[0].TargetGroupArn,
}) }); err != nil {
if err != nil {
return nil, fmt.Errorf("Error deleting old target group: %q", err) return nil, fmt.Errorf("Error deleting old target group: %q", err)
} }
} else { } else {
// Run ensureTargetGroup to make sure instances in service are up-to-date // Run ensureTargetGroup to make sure instances in service are up-to-date
targetName := createTargetName(namespacedName, frontendPort, nodePort)
_, err = c.ensureTargetGroup( _, err = c.ensureTargetGroup(
targetGroup, targetGroup,
namespacedName,
mapping, mapping,
instanceIDs, instanceIDs,
targetName,
*loadBalancer.VpcId, *loadBalancer.VpcId,
tags,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@ -280,11 +295,10 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
} }
// Additions // Additions
_, targetGroupArn, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId) _, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId, tags)
if err != nil { if err != nil {
return nil, err return nil, err
} }
addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn)
dirty = true dirty = true
} }
@ -303,19 +317,6 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
dirty = true dirty = true
} }
} }
// Add tags to new targets
for k, v := range tags {
addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 {
_, err = c.elbv2.AddTags(addTagsInput)
if err != nil {
return nil, fmt.Errorf("Error adding tags after modifying load balancer targets: %q", err)
}
}
} }
desiredLoadBalancerAttributes := map[string]string{} desiredLoadBalancerAttributes := map[string]string{}
@ -394,42 +395,65 @@ func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBa
return loadBalancer, nil return loadBalancer, nil
} }
// create a valid target group name - ensure name is not over 32 characters var invalidELBV2NameRegex = regexp.MustCompile("[^[:alnum:]]")
func createTargetName(namespacedName types.NamespacedName, frontendPort, nodePort int64) string {
sha := fmt.Sprintf("%x", sha1.Sum([]byte(namespacedName.String())))[:13] // buildTargetGroupName will build unique name for targetGroup of service & port.
return fmt.Sprintf("k8s-tg-%s-%d-%d", sha, frontendPort, nodePort) // the name is in format k8s-{namespace:8}-{name:8}-{uuid:10} (chosen to benefit most common use cases).
// Note: targetProtocol & targetType are included since they cannot be modified on existing targetGroup.
func (c *Cloud) buildTargetGroupName(serviceName types.NamespacedName, servicePort int64, targetProtocol string, targetType string) string {
hasher := sha1.New()
_, _ = hasher.Write([]byte(c.tagging.clusterID()))
_, _ = hasher.Write([]byte(serviceName.Namespace))
_, _ = hasher.Write([]byte(serviceName.Name))
_, _ = hasher.Write([]byte(strconv.FormatInt(servicePort, 10)))
_, _ = hasher.Write([]byte(targetProtocol))
_, _ = hasher.Write([]byte(targetType))
tgUUID := hex.EncodeToString(hasher.Sum(nil))
sanitizedNamespace := invalidELBV2NameRegex.ReplaceAllString(serviceName.Namespace, "")
sanitizedServiceName := invalidELBV2NameRegex.ReplaceAllString(serviceName.Name, "")
return fmt.Sprintf("k8s-%.8s-%.8s-%.10s", sanitizedNamespace, sanitizedServiceName, tgUUID)
} }
func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string) (listener *elbv2.Listener, targetGroupArn *string, err error) { func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string, tags map[string]string) (listener *elbv2.Listener, err error) {
targetName := createTargetName(namespacedName, mapping.FrontendPort, mapping.TrafficPort)
klog.Infof("Creating load balancer target group for %v with name: %s", namespacedName, targetName)
target, err := c.ensureTargetGroup( target, err := c.ensureTargetGroup(
nil, nil,
namespacedName,
mapping, mapping,
instanceIDs, instanceIDs,
targetName,
vpcID, vpcID,
tags,
) )
if err != nil { if err != nil {
return nil, aws.String(""), err return nil, err
} }
createListernerInput := &elbv2.CreateListenerInput{ createListernerInput := &elbv2.CreateListenerInput{
LoadBalancerArn: loadBalancerArn, LoadBalancerArn: loadBalancerArn,
Port: aws.Int64(mapping.FrontendPort), Port: aws.Int64(mapping.FrontendPort),
Protocol: aws.String("TCP"), Protocol: aws.String(mapping.FrontendProtocol),
DefaultActions: []*elbv2.Action{{ DefaultActions: []*elbv2.Action{{
TargetGroupArn: target.TargetGroupArn, TargetGroupArn: target.TargetGroupArn,
Type: aws.String(elbv2.ActionTypeEnumForward), Type: aws.String(elbv2.ActionTypeEnumForward),
}}, }},
} }
if mapping.FrontendProtocol == "TLS" {
if mapping.SSLPolicy != "" {
createListernerInput.SslPolicy = aws.String(mapping.SSLPolicy)
}
createListernerInput.Certificates = []*elbv2.Certificate{
{
CertificateArn: aws.String(mapping.SSLCertificateARN),
},
}
}
klog.Infof("Creating load balancer listener for %v", namespacedName) klog.Infof("Creating load balancer listener for %v", namespacedName)
createListenerOutput, err := c.elbv2.CreateListener(createListernerInput) createListenerOutput, err := c.elbv2.CreateListener(createListernerInput)
if err != nil { if err != nil {
return nil, aws.String(""), fmt.Errorf("Error creating load balancer listener: %q", err) return nil, fmt.Errorf("Error creating load balancer listener: %q", err)
} }
return createListenerOutput.Listeners[0], target.TargetGroupArn, nil return createListenerOutput.Listeners[0], nil
} }
// cleans up listener and corresponding target group // cleans up listener and corresponding target group
@ -445,17 +469,19 @@ func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
return nil return nil
} }
// ensureTargetGroup creates a target group with a set of instances // ensureTargetGroup creates a target group with a set of instances.
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPortMapping, instances []string, name string, vpcID string) (*elbv2.TargetGroup, error) { func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName types.NamespacedName, mapping nlbPortMapping, instances []string, vpcID string, tags map[string]string) (*elbv2.TargetGroup, error) {
dirty := false dirty := false
if targetGroup == nil { if targetGroup == nil {
targetType := "instance"
name := c.buildTargetGroupName(serviceName, mapping.FrontendPort, mapping.TrafficProtocol, targetType)
klog.Infof("Creating load balancer target group for %v with name: %s", serviceName, name)
input := &elbv2.CreateTargetGroupInput{ input := &elbv2.CreateTargetGroupInput{
VpcId: aws.String(vpcID), VpcId: aws.String(vpcID),
Name: aws.String(name), Name: aws.String(name),
Port: aws.Int64(mapping.TrafficPort), Port: aws.Int64(mapping.TrafficPort),
Protocol: aws.String("TCP"), Protocol: aws.String(mapping.TrafficProtocol),
TargetType: aws.String("instance"), TargetType: aws.String(targetType),
HealthCheckIntervalSeconds: aws.Int64(30), HealthCheckIntervalSeconds: aws.Int64(30),
HealthCheckPort: aws.String("traffic-port"), HealthCheckPort: aws.String("traffic-port"),
HealthCheckProtocol: aws.String("TCP"), HealthCheckProtocol: aws.String("TCP"),
@ -481,6 +507,22 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPor
return nil, fmt.Errorf("Expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups)) return nil, fmt.Errorf("Expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups))
} }
if len(tags) != 0 {
targetGroupTags := make([]*elbv2.Tag, 0, len(tags))
for k, v := range tags {
targetGroupTags = append(targetGroupTags, &elbv2.Tag{
Key: aws.String(k), Value: aws.String(v),
})
}
tgArn := aws.StringValue(result.TargetGroups[0].TargetGroupArn)
if _, err := c.elbv2.AddTags(&elbv2.AddTagsInput{
ResourceArns: []*string{aws.String(tgArn)},
Tags: targetGroupTags,
}); err != nil {
return nil, fmt.Errorf("error adding tags for targetGroup %s due to %q", tgArn, err)
}
}
registerInput := &elbv2.RegisterTargetsInput{ registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: result.TargetGroups[0].TargetGroupArn, TargetGroupArn: result.TargetGroups[0].TargetGroupArn,
Targets: []*elbv2.TargetDescription{}, Targets: []*elbv2.TargetDescription{},
@ -595,7 +637,7 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPor
if dirty { if dirty {
result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{ result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{
Names: []*string{aws.String(name)}, TargetGroupArns: []*string{targetGroup.TargetGroupArn},
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("Error retrieving target group after creation/update: %q", err) return nil, fmt.Errorf("Error retrieving target group after creation/update: %q", err)