Merge pull request #101592 from M00nF1sh/target-chunk

Register/Deregister Targets in chunks for AWS TargetGroup
This commit is contained in:
Kubernetes Prow Robot 2021-04-30 20:15:58 -07:00 committed by GitHub
commit b18aea1499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 563 additions and 73 deletions

View File

@ -65,6 +65,10 @@ var (
defaultNlbHealthCheckThreshold = int64(3)
defaultHealthCheckPort = "traffic-port"
defaultHealthCheckPath = "/"
// Defaults for ELB Target operations
defaultRegisterTargetsChunkSize = 100
defaultDeregisterTargetsChunkSize = 100
)
func isNLB(annotations map[string]string) bool {
@ -563,6 +567,7 @@ func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
// ensureTargetGroup creates a target group with a set of instances.
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName types.NamespacedName, mapping nlbPortMapping, instances []string, vpcID string, tags map[string]string) (*elbv2.TargetGroup, error) {
dirty := false
expectedTargets := c.computeTargetGroupExpectedTargets(instances, mapping.TrafficPort)
if targetGroup == nil {
targetType := "instance"
name := c.buildTargetGroupName(serviceName, mapping.FrontendPort, mapping.TrafficPort, mapping.TrafficProtocol, targetType, mapping)
@ -609,86 +614,23 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName ty
}
}
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: result.TargetGroups[0].TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
tg := result.TargetGroups[0]
tgARN := aws.StringValue(tg.TargetGroupArn)
if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, nil); err != nil {
return nil, err
}
for _, instanceID := range instances {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(string(instanceID)),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err = c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("error registering targets for load balancer: %q", err)
}
return result.TargetGroups[0], nil
return tg, nil
}
// handle instances in service
{
healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn})
tgARN := aws.StringValue(targetGroup.TargetGroupArn)
actualTargets, err := c.obtainTargetGroupActualTargets(tgARN)
if err != nil {
return nil, fmt.Errorf("error describing target group health: %q", err)
return nil, err
}
actualIDs := []string{}
for _, healthDescription := range healthResponse.TargetHealthDescriptions {
if aws.StringValue(healthDescription.TargetHealth.State) == elbv2.TargetHealthStateEnumHealthy {
actualIDs = append(actualIDs, *healthDescription.Target.Id)
} else if healthDescription.TargetHealth.Reason != nil {
switch aws.StringValue(healthDescription.TargetHealth.Reason) {
case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress:
// We don't need to count this instance in service if it is
// on its way out
default:
actualIDs = append(actualIDs, *healthDescription.Target.Id)
}
}
}
actual := sets.NewString(actualIDs...)
expected := sets.NewString(instances...)
additions := expected.Difference(actual)
removals := actual.Difference(expected)
if len(additions) > 0 {
registerInput := &elbv2.RegisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range additions {
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.RegisterTargets(registerInput)
if err != nil {
return nil, fmt.Errorf("error registering new targets in target group: %q", err)
}
dirty = true
}
if len(removals) > 0 {
deregisterInput := &elbv2.DeregisterTargetsInput{
TargetGroupArn: targetGroup.TargetGroupArn,
Targets: []*elbv2.TargetDescription{},
}
for instanceID := range removals {
deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(mapping.TrafficPort),
})
}
_, err := c.elbv2.DeregisterTargets(deregisterInput)
if err != nil {
return nil, fmt.Errorf("error trying to deregister targets in target group: %q", err)
}
dirty = true
if err := c.ensureTargetGroupTargets(tgARN, expectedTargets, actualTargets); err != nil {
return nil, err
}
}
@ -738,6 +680,101 @@ func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, serviceName ty
return targetGroup, nil
}
func (c *Cloud) ensureTargetGroupTargets(tgARN string, expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) error {
targetsToRegister, targetsToDeregister := c.diffTargetGroupTargets(expectedTargets, actualTargets)
if len(targetsToRegister) > 0 {
targetsToRegisterChunks := c.chunkTargetDescriptions(targetsToRegister, defaultRegisterTargetsChunkSize)
for _, targetsChunk := range targetsToRegisterChunks {
req := &elbv2.RegisterTargetsInput{
TargetGroupArn: aws.String(tgARN),
Targets: targetsChunk,
}
if _, err := c.elbv2.RegisterTargets(req); err != nil {
return fmt.Errorf("error trying to register targets in target group: %q", err)
}
}
}
if len(targetsToDeregister) > 0 {
targetsToDeregisterChunks := c.chunkTargetDescriptions(targetsToDeregister, defaultDeregisterTargetsChunkSize)
for _, targetsChunk := range targetsToDeregisterChunks {
req := &elbv2.DeregisterTargetsInput{
TargetGroupArn: aws.String(tgARN),
Targets: targetsChunk,
}
if _, err := c.elbv2.DeregisterTargets(req); err != nil {
return fmt.Errorf("error trying to deregister targets in target group: %q", err)
}
}
}
return nil
}
func (c *Cloud) computeTargetGroupExpectedTargets(instanceIDs []string, port int64) []*elbv2.TargetDescription {
expectedTargets := make([]*elbv2.TargetDescription, 0, len(instanceIDs))
for _, instanceID := range instanceIDs {
expectedTargets = append(expectedTargets, &elbv2.TargetDescription{
Id: aws.String(instanceID),
Port: aws.Int64(port),
})
}
return expectedTargets
}
func (c *Cloud) obtainTargetGroupActualTargets(tgARN string) ([]*elbv2.TargetDescription, error) {
req := &elbv2.DescribeTargetHealthInput{
TargetGroupArn: aws.String(tgARN),
}
resp, err := c.elbv2.DescribeTargetHealth(req)
if err != nil {
return nil, fmt.Errorf("error describing target group health: %q", err)
}
actualTargets := make([]*elbv2.TargetDescription, 0, len(resp.TargetHealthDescriptions))
for _, targetDesc := range resp.TargetHealthDescriptions {
if targetDesc.TargetHealth.Reason != nil && aws.StringValue(targetDesc.TargetHealth.Reason) == elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress {
continue
}
actualTargets = append(actualTargets, targetDesc.Target)
}
return actualTargets, nil
}
// diffTargetGroupTargets computes the targets to register and targets to deregister based on existingTargets and desired instances.
func (c *Cloud) diffTargetGroupTargets(expectedTargets []*elbv2.TargetDescription, actualTargets []*elbv2.TargetDescription) (targetsToRegister []*elbv2.TargetDescription, targetsToDeregister []*elbv2.TargetDescription) {
expectedTargetsByUID := make(map[string]*elbv2.TargetDescription, len(expectedTargets))
for _, target := range expectedTargets {
targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
expectedTargetsByUID[targetUID] = target
}
actualTargetsByUID := make(map[string]*elbv2.TargetDescription, len(actualTargets))
for _, target := range actualTargets {
targetUID := fmt.Sprintf("%v:%v", aws.StringValue(target.Id), aws.Int64Value(target.Port))
actualTargetsByUID[targetUID] = target
}
expectedTargetsUIDs := sets.StringKeySet(expectedTargetsByUID)
actualTargetsUIDs := sets.StringKeySet(actualTargetsByUID)
for _, targetUID := range expectedTargetsUIDs.Difference(actualTargetsUIDs).List() {
targetsToRegister = append(targetsToRegister, expectedTargetsByUID[targetUID])
}
for _, targetUID := range actualTargetsUIDs.Difference(expectedTargetsUIDs).List() {
targetsToDeregister = append(targetsToDeregister, actualTargetsByUID[targetUID])
}
return targetsToRegister, targetsToDeregister
}
// chunkTargetDescriptions will split slice of TargetDescription into chunks
func (c *Cloud) chunkTargetDescriptions(targets []*elbv2.TargetDescription, chunkSize int) [][]*elbv2.TargetDescription {
var chunks [][]*elbv2.TargetDescription
for i := 0; i < len(targets); i += chunkSize {
end := i + chunkSize
if end > len(targets) {
end = len(targets)
}
chunks = append(chunks, targets[i:end])
}
return chunks
}
// updateInstanceSecurityGroupsForNLB will adjust securityGroup's settings to allow inbound traffic into instances from clientCIDRs and portMappings.
// TIP: if either instances or clientCIDRs or portMappings are nil, then the securityGroup rules for lbName are cleared.
func (c *Cloud) updateInstanceSecurityGroupsForNLB(lbName string, instances map[InstanceID]*ec2.Instance, subnetCIDRs []string, clientCIDRs []string, portMappings []nlbPortMapping) error {

View File

@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/elb"
"github.com/aws/aws-sdk-go/service/elbv2"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
@ -540,3 +541,455 @@ func TestFilterTargetNodes(t *testing.T) {
})
}
}
func TestCloud_chunkTargetDescriptions(t *testing.T) {
type args struct {
targets []*elbv2.TargetDescription
chunkSize int
}
tests := []struct {
name string
args args
want [][]*elbv2.TargetDescription
}{
{
name: "can be evenly chunked",
args: args{
targets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
chunkSize: 2,
},
want: [][]*elbv2.TargetDescription{
{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
},
{
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
},
},
{
name: "cannot be evenly chunked",
args: args{
targets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
chunkSize: 3,
},
want: [][]*elbv2.TargetDescription{
{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
},
{
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
},
},
{
name: "chunkSize equal to total count",
args: args{
targets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
chunkSize: 4,
},
want: [][]*elbv2.TargetDescription{
{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
},
},
{
name: "chunkSize greater than total count",
args: args{
targets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
chunkSize: 10,
},
want: [][]*elbv2.TargetDescription{
{
{
Id: aws.String("i-abcdefg1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg3"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdefg4"),
Port: aws.Int64(8080),
},
},
},
},
{
name: "chunk nil slice",
args: args{
targets: nil,
chunkSize: 2,
},
want: nil,
},
{
name: "chunk empty slice",
args: args{
targets: []*elbv2.TargetDescription{},
chunkSize: 2,
},
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Cloud{}
got := c.chunkTargetDescriptions(tt.args.targets, tt.args.chunkSize)
assert.Equal(t, tt.want, got)
})
}
}
func TestCloud_diffTargetGroupTargets(t *testing.T) {
type args struct {
expectedTargets []*elbv2.TargetDescription
actualTargets []*elbv2.TargetDescription
}
tests := []struct {
name string
args args
wantTargetsToRegister []*elbv2.TargetDescription
wantTargetsToDeregister []*elbv2.TargetDescription
}{
{
name: "all targets to register",
args: args{
expectedTargets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
},
actualTargets: nil,
},
wantTargetsToRegister: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
},
wantTargetsToDeregister: nil,
},
{
name: "all targets to deregister",
args: args{
expectedTargets: nil,
actualTargets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
},
},
wantTargetsToRegister: nil,
wantTargetsToDeregister: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
},
},
{
name: "some targets to register and deregister",
args: args{
expectedTargets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef4"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef5"),
Port: aws.Int64(8080),
},
},
actualTargets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef3"),
Port: aws.Int64(8080),
},
},
},
wantTargetsToRegister: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef4"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef5"),
Port: aws.Int64(8080),
},
},
wantTargetsToDeregister: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef3"),
Port: aws.Int64(8080),
},
},
},
{
name: "both expected and actual targets are empty",
args: args{
expectedTargets: nil,
actualTargets: nil,
},
wantTargetsToRegister: nil,
wantTargetsToDeregister: nil,
},
{
name: "expected and actual targets equals",
args: args{
expectedTargets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef3"),
Port: aws.Int64(8080),
},
},
actualTargets: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef3"),
Port: aws.Int64(8080),
},
},
},
wantTargetsToRegister: nil,
wantTargetsToDeregister: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Cloud{}
gotTargetsToRegister, gotTargetsToDeregister := c.diffTargetGroupTargets(tt.args.expectedTargets, tt.args.actualTargets)
assert.Equal(t, tt.wantTargetsToRegister, gotTargetsToRegister)
assert.Equal(t, tt.wantTargetsToDeregister, gotTargetsToDeregister)
})
}
}
func TestCloud_computeTargetGroupExpectedTargets(t *testing.T) {
type args struct {
instanceIDs []string
port int64
}
tests := []struct {
name string
args args
want []*elbv2.TargetDescription
}{
{
name: "no instance",
args: args{
instanceIDs: nil,
port: 8080,
},
want: []*elbv2.TargetDescription{},
},
{
name: "one instance",
args: args{
instanceIDs: []string{"i-abcdef1"},
port: 8080,
},
want: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
},
},
{
name: "multiple instances",
args: args{
instanceIDs: []string{"i-abcdef1", "i-abcdef2", "i-abcdef3"},
port: 8080,
},
want: []*elbv2.TargetDescription{
{
Id: aws.String("i-abcdef1"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef2"),
Port: aws.Int64(8080),
},
{
Id: aws.String("i-abcdef3"),
Port: aws.Int64(8080),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Cloud{}
got := c.computeTargetGroupExpectedTargets(tt.args.instanceIDs, tt.args.port)
assert.Equal(t, tt.want, got)
})
}
}