Add etag for NSG updates so as to fix nsg race condition

This commit is contained in:
Pengfei Ni
2019-04-29 13:01:00 +08:00
parent 69718b975d
commit 34e898ac5e
2 changed files with 63 additions and 12 deletions

View File

@@ -22,6 +22,7 @@ import (
"github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-03-01/compute" "github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2019-03-01/compute"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network"
"github.com/Azure/go-autorest/autorest/to"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@@ -146,7 +147,7 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg) resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
if err == nil { if err == nil {
if isSuccessHTTPResponse(resp) { if isSuccessHTTPResponse(resp) {
@@ -156,6 +157,11 @@ func (az *Cloud) CreateOrUpdateSecurityGroup(service *v1.Service, sg network.Sec
return fmt.Errorf("HTTP response %q", resp.Status) return fmt.Errorf("HTTP response %q", resp.Status)
} }
} }
// Invalidate the cache because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.nsgCache.Delete(*sg.Name)
}
return err return err
} }
@@ -168,14 +174,20 @@ func (az *Cloud) CreateOrUpdateSGWithRetry(service *v1.Service, sg network.Secur
ctx, cancel := getContextWithCancel() ctx, cancel := getContextWithCancel()
defer cancel() defer cancel()
resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg) resp, err := az.SecurityGroupsClient.CreateOrUpdate(ctx, az.ResourceGroup, *sg.Name, sg, to.String(sg.Etag))
klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name) klog.V(10).Infof("SecurityGroupsClient.CreateOrUpdate(%s): end", *sg.Name)
done, err := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err) done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateSecurityGroup", resp, err)
if done && err == nil { if done && err == nil {
// Invalidate the cache right after updating // Invalidate the cache right after updating
az.nsgCache.Delete(*sg.Name) az.nsgCache.Delete(*sg.Name)
} }
return done, err
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.nsgCache.Delete(*sg.Name)
return true, err
}
return done, retryError
}) })
} }
@@ -538,17 +550,22 @@ func isSuccessHTTPResponse(resp *http.Response) bool {
} }
func shouldRetryHTTPRequest(resp *http.Response, err error) bool { func shouldRetryHTTPRequest(resp *http.Response, err error) bool {
if err != nil {
return true
}
if resp != nil { if resp != nil {
// HTTP 4xx or 5xx suggests we should retry // HTTP 412 (StatusPreconditionFailed) means etag mismatch, hence we shouldn't retry.
if resp.StatusCode == http.StatusPreconditionFailed {
return false
}
// HTTP 4xx (except 412) or 5xx suggests we should retry.
if 399 < resp.StatusCode && resp.StatusCode < 600 { if 399 < resp.StatusCode && resp.StatusCode < 600 {
return true return true
} }
} }
if err != nil {
return true
}
return false return false
} }

View File

@@ -81,7 +81,7 @@ type SubnetsClient interface {
// SecurityGroupsClient defines needed functions for azure network.SecurityGroupsClient // SecurityGroupsClient defines needed functions for azure network.SecurityGroupsClient
type SecurityGroupsClient interface { type SecurityGroupsClient interface {
CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error)
Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error)
Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error) Get(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, expand string) (result network.SecurityGroup, err error)
List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err error) List(ctx context.Context, resourceGroupName string) (result []network.SecurityGroup, err error)
@@ -714,7 +714,7 @@ func newAzSecurityGroupsClient(config *azClientConfig) *azSecurityGroupsClient {
} }
} }
func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup) (resp *http.Response, err error) { func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (resp *http.Response, err error) {
/* Write rate limiting */ /* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() { if !az.rateLimiterWriter.TryAccept() {
err = createRateLimitErr(true, "NSGCreateOrUpdate") err = createRateLimitErr(true, "NSGCreateOrUpdate")
@@ -727,7 +727,13 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr
}() }()
mc := newMetricContext("security_groups", "create_or_update", resourceGroupName, az.client.SubscriptionID) mc := newMetricContext("security_groups", "create_or_update", resourceGroupName, az.client.SubscriptionID)
future, err := az.client.CreateOrUpdate(ctx, resourceGroupName, networkSecurityGroupName, parameters) req, err := az.createOrUpdatePreparer(ctx, resourceGroupName, networkSecurityGroupName, parameters, etag)
if err != nil {
mc.Observe(err)
return nil, err
}
future, err := az.client.CreateOrUpdateSender(req)
if err != nil { if err != nil {
mc.Observe(err) mc.Observe(err)
return future.Response(), err return future.Response(), err
@@ -738,6 +744,34 @@ func (az *azSecurityGroupsClient) CreateOrUpdate(ctx context.Context, resourceGr
return future.Response(), err return future.Response(), err
} }
// createOrUpdatePreparer prepares the CreateOrUpdate request.
func (az *azSecurityGroupsClient) createOrUpdatePreparer(ctx context.Context, resourceGroupName string, networkSecurityGroupName string, parameters network.SecurityGroup, etag string) (*http.Request, error) {
pathParameters := map[string]interface{}{
"networkSecurityGroupName": autorest.Encode("path", networkSecurityGroupName),
"resourceGroupName": autorest.Encode("path", resourceGroupName),
"subscriptionId": autorest.Encode("path", az.client.SubscriptionID),
}
const APIVersion = "2017-09-01"
queryParameters := map[string]interface{}{
"api-version": APIVersion,
}
preparerDecorators := []autorest.PrepareDecorator{
autorest.AsContentType("application/json; charset=utf-8"),
autorest.AsPut(),
autorest.WithBaseURL(az.client.BaseURI),
autorest.WithPathParameters("/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Network/networkSecurityGroups/{networkSecurityGroupName}", pathParameters),
autorest.WithJSON(parameters),
autorest.WithQueryParameters(queryParameters),
}
if etag != "" {
preparerDecorators = append(preparerDecorators, autorest.WithHeader("If-Match", autorest.String(etag)))
}
preparer := autorest.CreatePreparer(preparerDecorators...)
return preparer.Prepare((&http.Request{}).WithContext(ctx))
}
func (az *azSecurityGroupsClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) { func (az *azSecurityGroupsClient) Delete(ctx context.Context, resourceGroupName string, networkSecurityGroupName string) (resp *http.Response, err error) {
/* Write rate limiting */ /* Write rate limiting */
if !az.rateLimiterWriter.TryAccept() { if !az.rateLimiterWriter.TryAccept() {