mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #11029 from justinsb/fix_aws_security_group_races
AWS: Fix security group races
This commit is contained in:
commit
8d5a6b063c
@ -50,6 +50,12 @@ const ProviderName = "aws"
|
||||
// The tag name we use to differentiate multiple logically independent clusters running in the same AZ
|
||||
const TagNameKubernetesCluster = "KubernetesCluster"
|
||||
|
||||
// We sometimes read to see if something exists; then try to create it if we didn't find it
|
||||
// This can fail once in a consistent system if done in parallel
|
||||
// In an eventually consistent system, it could fail unboundedly
|
||||
// MaxReadThenCreateRetries sets the maxiumum number of attempts we will make
|
||||
const MaxReadThenCreateRetries = 30
|
||||
|
||||
// Abstraction over AWS, to allow mocking/other implementations
|
||||
type AWSServices interface {
|
||||
Compute(region string) (EC2, error)
|
||||
@ -1656,37 +1662,56 @@ func (s *AWSCloud) removeSecurityGroupIngress(securityGroupId string, removePerm
|
||||
// Makes sure the security group exists
|
||||
// Returns the security group id or error
|
||||
func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID string) (string, error) {
|
||||
request := &ec2.DescribeSecurityGroupsInput{}
|
||||
filters := []*ec2.Filter{
|
||||
newEc2Filter("group-name", name),
|
||||
newEc2Filter("vpc-id", vpcID),
|
||||
}
|
||||
request.Filters = s.addFilters(filters)
|
||||
groupID := ""
|
||||
attempt := 0
|
||||
for {
|
||||
attempt++
|
||||
|
||||
securityGroups, err := s.ec2.DescribeSecurityGroups(request)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(securityGroups) >= 1 {
|
||||
if len(securityGroups) > 1 {
|
||||
glog.Warning("Found multiple security groups with name:", name)
|
||||
request := &ec2.DescribeSecurityGroupsInput{}
|
||||
filters := []*ec2.Filter{
|
||||
newEc2Filter("group-name", name),
|
||||
newEc2Filter("vpc-id", vpcID),
|
||||
}
|
||||
request.Filters = s.addFilters(filters)
|
||||
|
||||
securityGroups, err := s.ec2.DescribeSecurityGroups(request)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(securityGroups) >= 1 {
|
||||
if len(securityGroups) > 1 {
|
||||
glog.Warning("Found multiple security groups with name:", name)
|
||||
}
|
||||
return orEmpty(securityGroups[0].GroupID), nil
|
||||
}
|
||||
|
||||
createRequest := &ec2.CreateSecurityGroupInput{}
|
||||
createRequest.VPCID = &vpcID
|
||||
createRequest.GroupName = &name
|
||||
createRequest.Description = &description
|
||||
|
||||
createResponse, err := s.ec2.CreateSecurityGroup(createRequest)
|
||||
if err != nil {
|
||||
ignore := false
|
||||
switch err.(type) {
|
||||
case awserr.Error:
|
||||
awsError := err.(awserr.Error)
|
||||
if awsError.Code() == "InvalidGroup.Duplicate" && attempt < MaxReadThenCreateRetries {
|
||||
glog.V(2).Infof("Got InvalidGroup.Duplicate while creating security group (race?); will retry")
|
||||
ignore = true
|
||||
}
|
||||
}
|
||||
if !ignore {
|
||||
glog.Error("error creating security group: ", err)
|
||||
return "", err
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
} else {
|
||||
groupID = orEmpty(createResponse.GroupID)
|
||||
break
|
||||
}
|
||||
return orEmpty(securityGroups[0].GroupID), nil
|
||||
}
|
||||
|
||||
createRequest := &ec2.CreateSecurityGroupInput{}
|
||||
createRequest.VPCID = &vpcID
|
||||
createRequest.GroupName = &name
|
||||
createRequest.Description = &description
|
||||
|
||||
createResponse, err := s.ec2.CreateSecurityGroup(createRequest)
|
||||
if err != nil {
|
||||
glog.Error("error creating security group: ", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
groupID := orEmpty(createResponse.GroupID)
|
||||
if groupID == "" {
|
||||
return "", fmt.Errorf("created security group, but id was not returned: %s", name)
|
||||
}
|
||||
@ -1702,14 +1727,39 @@ func (s *AWSCloud) ensureSecurityGroup(name string, description string, vpcID st
|
||||
tagRequest := &ec2.CreateTagsInput{}
|
||||
tagRequest.Resources = []*string{&groupID}
|
||||
tagRequest.Tags = tags
|
||||
_, err = s.ec2.CreateTags(tagRequest)
|
||||
if err != nil {
|
||||
if _, err := s.createTags(tagRequest); err != nil {
|
||||
// Not clear how to recover fully from this; we're OK because we don't match on tags, but that is a little odd
|
||||
return "", fmt.Errorf("error tagging security group: %v", err)
|
||||
}
|
||||
return groupID, nil
|
||||
}
|
||||
|
||||
// createTags calls EC2 CreateTags, but adds retry-on-failure logic
|
||||
// We retry mainly because if we create an object, we cannot tag it until it is "fully created" (eventual consistency)
|
||||
// The error code varies though (depending on what we are tagging), so we simply retry on all errors
|
||||
func (s *AWSCloud) createTags(request *ec2.CreateTagsInput) (*ec2.CreateTagsOutput, error) {
|
||||
// TODO: We really should do exponential backoff here
|
||||
attempt := 0
|
||||
maxAttempts := 60
|
||||
|
||||
for {
|
||||
response, err := s.ec2.CreateTags(request)
|
||||
if err == nil {
|
||||
return response, err
|
||||
}
|
||||
|
||||
// We could check that the error is retryable, but the error code changes based on what we are tagging
|
||||
// SecurityGroup: InvalidGroup.NotFound
|
||||
attempt++
|
||||
if attempt > maxAttempts {
|
||||
glog.Warningf("Failed to create tags (too many attempts): %v", err)
|
||||
return response, err
|
||||
}
|
||||
glog.V(2).Infof("Failed to create tags; will retry. Error was %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateTCPLoadBalancer implements TCPLoadBalancer.CreateTCPLoadBalancer
|
||||
// TODO(justinsb): This must be idempotent
|
||||
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anwyay.
|
||||
|
Loading…
Reference in New Issue
Block a user