Set ETAG when updating Azure loadbalancer, route and route table

This commit is contained in:
Pengfei Ni 2019-05-06 15:58:04 +08:00
parent a019f17919
commit 7ca1c832d2
4 changed files with 73 additions and 24 deletions

View File

@ -197,7 +197,7 @@ func (az *Cloud) CreateOrUpdateLB(service *v1.Service, lb network.LoadBalancer)
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb)
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb, to.String(lb.Etag))
klog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
if err == nil {
if isSuccessHTTPResponse(resp) {
@ -207,6 +207,11 @@ func (az *Cloud) CreateOrUpdateLB(service *v1.Service, lb network.LoadBalancer)
return fmt.Errorf("HTTP response %q", resp.Status)
}
}
// Invalidate the cache because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.lbCache.Delete(*lb.Name)
}
return err
}
@ -219,14 +224,20 @@ func (az *Cloud) createOrUpdateLBWithRetry(service *v1.Service, lb network.LoadB
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb)
resp, err := az.LoadBalancerClient.CreateOrUpdate(ctx, az.ResourceGroup, *lb.Name, lb, to.String(lb.Etag))
klog.V(10).Infof("LoadBalancerClient.CreateOrUpdate(%s): end", *lb.Name)
done, err := az.processHTTPRetryResponse(service, "CreateOrUpdateLoadBalancer", resp, err)
done, retryError := az.processHTTPRetryResponse(service, "CreateOrUpdateLoadBalancer", resp, err)
if done && err == nil {
// Invalidate the cache right after updating
az.lbCache.Delete(*lb.Name)
}
return done, err
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.nsgCache.Delete(*lb.Name)
return true, err
}
return done, retryError
})
}
@ -441,7 +452,10 @@ func (az *Cloud) CreateOrUpdateRouteTable(routeTable network.RouteTable) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable)
resp, err := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable, to.String(routeTable.Etag))
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(*routeTable.Name)
}
return az.processHTTPResponse(nil, "", resp, err)
}
@ -454,8 +468,19 @@ func (az *Cloud) createOrUpdateRouteTableWithRetry(routeTable network.RouteTable
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable)
return az.processHTTPRetryResponse(nil, "", resp, err)
resp, err := az.RouteTablesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, routeTable, to.String(routeTable.Etag))
done, retryError := az.processHTTPRetryResponse(nil, "", resp, err)
if done && err == nil {
az.rtCache.Delete(*routeTable.Name)
return done, nil
}
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(*routeTable.Name)
return true, err
}
return done, retryError
})
}
@ -465,8 +490,11 @@ func (az *Cloud) CreateOrUpdateRoute(route network.Route) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route)
resp, err := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route, to.String(route.Etag))
klog.V(10).Infof("RoutesClient.CreateOrUpdate(%s): end", *route.Name)
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(az.RouteTableName)
}
return az.processHTTPResponse(nil, "", resp, err)
}
@ -479,9 +507,20 @@ func (az *Cloud) createOrUpdateRouteWithRetry(route network.Route) error {
ctx, cancel := getContextWithCancel()
defer cancel()
resp, err := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route)
resp, err := az.RoutesClient.CreateOrUpdate(ctx, az.RouteTableResourceGroup, az.RouteTableName, *route.Name, route, to.String(route.Etag))
klog.V(10).Infof("RoutesClient.CreateOrUpdate(%s): end", *route.Name)
return az.processHTTPRetryResponse(nil, "", resp, err)
done, retryError := az.processHTTPRetryResponse(nil, "", resp, err)
if done && err == nil {
az.rtCache.Delete(az.RouteTableName)
return done, nil
}
// Invalidate the cache and abort backoff because ETAG precondition mismatch.
if resp != nil && resp.StatusCode == http.StatusPreconditionFailed {
az.rtCache.Delete(az.RouteTableName)
return true, err
}
return done, retryError
})
}

View File

@ -52,7 +52,7 @@ func newFakeAzureLBClient() *fakeAzureLBClient {
return fLBC
}
func (fLBC *fakeAzureLBClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer) (resp *http.Response, err error) {
func (fLBC *fakeAzureLBClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, loadBalancerName string, parameters network.LoadBalancer, etag string) (resp *http.Response, err error) {
fLBC.mutex.Lock()
defer fLBC.mutex.Unlock()
@ -642,7 +642,7 @@ func newFakeRoutesClient() *fakeRoutesClient {
return fRC
}
func (fRC *fakeRoutesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, routeName string, routeParameters network.Route) (resp *http.Response, err error) {
func (fRC *fakeRoutesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, routeName string, routeParameters network.Route, etag string) (resp *http.Response, err error) {
fRC.mutex.Lock()
defer fRC.mutex.Unlock()
@ -683,7 +683,7 @@ func newFakeRouteTablesClient() *fakeRouteTablesClient {
return fRTC
}
func (fRTC *fakeRouteTablesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, parameters network.RouteTable) (resp *http.Response, err error) {
func (fRTC *fakeRouteTablesClient) CreateOrUpdate(ctx context.Context, resourceGroupName string, routeTableName string, parameters network.RouteTable, etag string) (resp *http.Response, err error) {
fRTC.mutex.Lock()
defer fRTC.mutex.Unlock()

View File

@ -147,7 +147,8 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser
return nil, err
}
if _, err := az.reconcilePublicIP(clusterName, updateService, lb, true /* wantLb */); err != nil {
// lb is not reused here because the ETAG may be changed in above operations, hence reconcilePublicIP() would get lb again from cache.
if _, err := az.reconcilePublicIP(clusterName, updateService, to.String(lb.Name), true /* wantLb */); err != nil {
return nil, err
}
@ -203,7 +204,7 @@ func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName stri
}
}
if _, err := az.reconcilePublicIP(clusterName, service, nil, false /* wantLb */); err != nil {
if _, err := az.reconcilePublicIP(clusterName, service, "", false /* wantLb */); err != nil {
if ignoreErrors(err) != nil {
return err
}
@ -1323,9 +1324,10 @@ func deduplicate(collection *[]string) *[]string {
}
// This reconciles the PublicIP resources similar to how the LB is reconciled.
func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lb *network.LoadBalancer, wantLb bool) (*network.PublicIPAddress, error) {
func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbName string, wantLb bool) (*network.PublicIPAddress, error) {
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
var lb *network.LoadBalancer
var desiredPipName string
var err error
if !isInternal && wantLb {
@ -1335,6 +1337,14 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lb *
}
}
if lbName != "" {
loadBalancer, _, err := az.getAzureLoadBalancer(lbName)
if err != nil {
return nil, err
}
lb = &loadBalancer
}
pipResourceGroup := az.getPublicIPAddressResourceGroup(service)
pips, err := az.ListPIP(service, pipResourceGroup)

View File

@ -879,13 +879,13 @@ func TestReconcilePublicIPWithNewService(t *testing.T) {
az := getTestCloud()
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
pip, err := az.reconcilePublicIP(testClusterName, &svc, nil, true /* wantLb*/)
pip, err := az.reconcilePublicIP(testClusterName, &svc, "", true /* wantLb*/)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
validatePublicIP(t, pip, &svc, true)
pip2, err := az.reconcilePublicIP(testClusterName, &svc, nil, true /* wantLb */)
pip2, err := az.reconcilePublicIP(testClusterName, &svc, "", true /* wantLb */)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -900,7 +900,7 @@ func TestReconcilePublicIPRemoveService(t *testing.T) {
az := getTestCloud()
svc := getTestService("servicea", v1.ProtocolTCP, 80, 443)
pip, err := az.reconcilePublicIP(testClusterName, &svc, nil, true /* wantLb*/)
pip, err := az.reconcilePublicIP(testClusterName, &svc, "", true /* wantLb*/)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -908,7 +908,7 @@ func TestReconcilePublicIPRemoveService(t *testing.T) {
validatePublicIP(t, pip, &svc, true)
// Remove the service
pip, err = az.reconcilePublicIP(testClusterName, &svc, nil, false /* wantLb */)
pip, err = az.reconcilePublicIP(testClusterName, &svc, "", false /* wantLb */)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -920,7 +920,7 @@ func TestReconcilePublicIPWithInternalService(t *testing.T) {
az := getTestCloud()
svc := getInternalTestService("servicea", 80, 443)
pip, err := az.reconcilePublicIP(testClusterName, &svc, nil, true /* wantLb*/)
pip, err := az.reconcilePublicIP(testClusterName, &svc, "", true /* wantLb*/)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -932,7 +932,7 @@ func TestReconcilePublicIPWithExternalAndInternalSwitch(t *testing.T) {
az := getTestCloud()
svc := getInternalTestService("servicea", 80, 443)
pip, err := az.reconcilePublicIP(testClusterName, &svc, nil, true /* wantLb*/)
pip, err := az.reconcilePublicIP(testClusterName, &svc, "", true /* wantLb*/)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -940,14 +940,14 @@ func TestReconcilePublicIPWithExternalAndInternalSwitch(t *testing.T) {
// Update to external service
svcUpdated := getTestService("servicea", v1.ProtocolTCP, 80)
pip, err = az.reconcilePublicIP(testClusterName, &svcUpdated, nil, true /* wantLb*/)
pip, err = az.reconcilePublicIP(testClusterName, &svcUpdated, "", true /* wantLb*/)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
validatePublicIP(t, pip, &svcUpdated, true)
// Update to internal service again
pip, err = az.reconcilePublicIP(testClusterName, &svc, nil, true /* wantLb*/)
pip, err = az.reconcilePublicIP(testClusterName, &svc, "", true /* wantLb*/)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}