From 51d44af1866d97e1d7446e27ee82ff7d3ce09d93 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Sun, 16 Feb 2020 11:12:50 +0000 Subject: [PATCH] Fix route conflicted operations when updating multiple routes together --- .../legacy-cloud-providers/azure/azure.go | 5 + .../azure/azure_routes.go | 205 +++++++++++++++--- .../azure/azure_routes_test.go | 171 ++++++--------- 3 files changed, 245 insertions(+), 136 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go index 74342f42ad5..75218aa630f 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure.go @@ -272,6 +272,7 @@ type Cloud struct { kubeClient clientset.Interface eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder + routeUpdater *delayedRouteUpdater vmCache *timedCache lbCache *timedCache @@ -552,6 +553,10 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro return err } + // start delayed route updater. + az.routeUpdater = newDelayedRouteUpdater(az, routeUpdateInterval) + go az.routeUpdater.run() + return nil } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go index 25506665a4c..b7b7e795027 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes.go @@ -22,6 +22,8 @@ import ( "context" "fmt" "strings" + "sync" + "time" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" "github.com/Azure/go-autorest/autorest/to" @@ -32,13 +34,169 @@ import ( utilnet "k8s.io/utils/net" ) +var ( + // routeUpdateInterval defines the route reconciling interval. + routeUpdateInterval = 30 * time.Second +) + +// routeOperation defines the allowed operations for route updating. +type routeOperation string + // copied to minimize the number of cross reference // and exceptions in publishing and allowed imports. const ( routeNameFmt = "%s____%s" routeNameSeparator = "____" + + // Route operations. + routeOperationAdd routeOperation = "add" + routeOperationDelete routeOperation = "delete" ) +// delayedRouteOperation defines a delayed route operation which is used in delayedRouteUpdater. +type delayedRouteOperation struct { + route network.Route + operation routeOperation + result chan error +} + +// wait waits for the operation completion and returns the result. +func (op *delayedRouteOperation) wait() error { + return <-op.result +} + +// delayedRouteUpdater defines a delayed route updater, which batches all the +// route updating operations within "interval" period. +// Example usage: +// op, err := updater.addRouteOperation(routeOperationAdd, route) +// err = op.wait() +type delayedRouteUpdater struct { + az *Cloud + interval time.Duration + + lock sync.Mutex + routesToUpdate []*delayedRouteOperation +} + +// newDelayedRouteUpdater creates a new delayedRouteUpdater. +func newDelayedRouteUpdater(az *Cloud, interval time.Duration) *delayedRouteUpdater { + return &delayedRouteUpdater{ + az: az, + interval: interval, + routesToUpdate: make([]*delayedRouteOperation, 0), + } +} + +// run starts the updater reconciling loop. +func (d *delayedRouteUpdater) run() { + for { + d.updateRoutes() + time.Sleep(d.interval) + } +} + +// updateRoutes invokes route table client to update all routes. +func (d *delayedRouteUpdater) updateRoutes() { + d.lock.Lock() + defer d.lock.Unlock() + + // No need to do any updating. + if len(d.routesToUpdate) == 0 { + return + } + + var err error + defer func() { + // Notify all the goroutines. + for _, rt := range d.routesToUpdate { + rt.result <- err + } + // Clear all the jobs. + d.routesToUpdate = make([]*delayedRouteOperation, 0) + }() + + var routeTable network.RouteTable + var existsRouteTable bool + routeTable, existsRouteTable, err = d.az.getRouteTable(cacheReadTypeDefault) + if err != nil { + klog.Errorf("getRouteTable() failed with error: %v", err) + return + } + + // create route table if it doesn't exists yet. + if !existsRouteTable { + err = d.az.createRouteTable() + if err != nil { + klog.Errorf("createRouteTable() failed with error: %v", err) + return + } + + routeTable, _, err = d.az.getRouteTable(cacheReadTypeDefault) + if err != nil { + klog.Errorf("getRouteTable() failed with error: %v", err) + return + } + } + + // reconcile routes. + dirty := false + routes := []network.Route{} + if routeTable.Routes != nil { + routes = *routeTable.Routes + } + for _, rt := range d.routesToUpdate { + routeMatch := false + for i, existingRoute := range routes { + if strings.EqualFold(to.String(existingRoute.Name), to.String(rt.route.Name)) { + // delete the name-matched routes here (missing routes would be added later if the operation is add). + routes = append(routes[:i], routes[i+1:]...) + if existingRoute.RoutePropertiesFormat != nil && + rt.route.RoutePropertiesFormat != nil && + strings.EqualFold(to.String(existingRoute.AddressPrefix), to.String(rt.route.AddressPrefix)) && + strings.EqualFold(to.String(existingRoute.NextHopIPAddress), to.String(rt.route.NextHopIPAddress)) { + routeMatch = true + } + if rt.operation == routeOperationDelete { + dirty = true + } + break + } + } + + // Add missing routes if the operation is add. + if rt.operation == routeOperationAdd { + routes = append(routes, rt.route) + if !routeMatch { + dirty = true + } + continue + } + } + + if dirty { + routeTable.Routes = &routes + err = d.az.CreateOrUpdateRouteTable(routeTable) + if err != nil { + klog.Errorf("CreateOrUpdateRouteTable() failed with error: %v", err) + return + } + } +} + +// addRouteOperation adds the routeOperation to delayedRouteUpdater and returns a delayedRouteOperation. +func (d *delayedRouteUpdater) addRouteOperation(operation routeOperation, route network.Route) (*delayedRouteOperation, error) { + d.lock.Lock() + defer d.lock.Unlock() + + op := &delayedRouteOperation{ + route: route, + operation: operation, + result: make(chan error), + } + d.routesToUpdate = append(d.routesToUpdate, op) + return op, nil +} + // ListRoutes lists all managed routes that belong to the specified clusterName func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) @@ -97,16 +255,6 @@ func processRoutes(ipv6DualStackEnabled bool, routeTable network.RouteTable, exi return kubeRoutes, nil } -func (az *Cloud) createRouteTableIfNotExists(clusterName string, kubeRoute *cloudprovider.Route) error { - if _, existsRouteTable, err := az.getRouteTable(cacheReadTypeDefault); err != nil { - klog.V(2).Infof("createRouteTableIfNotExists error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) - return err - } else if existsRouteTable { - return nil - } - return az.createRouteTable() -} - func (az *Cloud) createRouteTable() error { routeTable := network.RouteTable{ Name: to.StringPtr(az.RouteTableName), @@ -148,10 +296,6 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s return nil } - klog.V(2).Infof("CreateRoute: creating route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) - if err := az.createRouteTableIfNotExists(clusterName, kubeRoute); err != nil { - return err - } if !az.ipv6DualStackEnabled { targetIP, _, err = az.getIPForMachine(kubeRoute.TargetNode) if err != nil { @@ -184,24 +328,17 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s }, } - actualRoutes, err := az.ListRoutes(ctx, clusterName) + klog.V(2).Infof("CreateRoute: creating route for clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) + op, err := az.routeUpdater.addRouteOperation(routeOperationAdd, route) if err != nil { - klog.V(3).Infof("CreateRoute: creating route: failed(ListRoutes) clusterName= %q instance=%q with error=%v", clusterName, kubeRoute.TargetNode, err) + klog.Errorf("CreateRoute failed for node %q with error: %v", kubeRoute.TargetNode, err) return err } - for _, actualRoute := range actualRoutes { - if strings.EqualFold(actualRoute.Name, kubeRoute.Name) && - strings.EqualFold(string(actualRoute.TargetNode), string(kubeRoute.TargetNode)) && - strings.EqualFold(actualRoute.DestinationCIDR, kubeRoute.DestinationCIDR) { - klog.V(2).Infof("CreateRoute: route is already existed and matched, no need to re-create or update") - return nil - } - } - - klog.V(3).Infof("CreateRoute: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR) - err = az.CreateOrUpdateRoute(route) + // Wait for operation complete. + err = op.wait() if err != nil { + klog.Errorf("CreateRoute failed for node %q with error: %v", kubeRoute.TargetNode, err) return err } @@ -229,8 +366,20 @@ func (az *Cloud) DeleteRoute(ctx context.Context, clusterName string, kubeRoute klog.V(2).Infof("DeleteRoute: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) routeName := mapNodeNameToRouteName(az.ipv6DualStackEnabled, kubeRoute.TargetNode, string(kubeRoute.DestinationCIDR)) - err = az.DeleteRouteWithName(routeName) + route := network.Route{ + Name: to.StringPtr(routeName), + RoutePropertiesFormat: &network.RoutePropertiesFormat{}, + } + op, err := az.routeUpdater.addRouteOperation(routeOperationDelete, route) if err != nil { + klog.Errorf("DeleteRoute failed for node %q with error: %v", kubeRoute.TargetNode, err) + return err + } + + // Wait for operation complete. + err = op.wait() + if err != nil { + klog.Errorf("DeleteRoute failed for node %q with error: %v", kubeRoute.TargetNode, err) return err } diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes_test.go index b8f16905c7f..6eeb5929176 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_routes_test.go @@ -23,6 +23,7 @@ import ( "fmt" "reflect" "testing" + "time" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" "github.com/Azure/go-autorest/autorest/to" @@ -34,10 +35,10 @@ import ( ) func TestDeleteRoute(t *testing.T) { - fakeRoutes := newFakeRoutesClient() + fakeTable := newFakeRouteTablesClient() cloud := &Cloud{ - RoutesClient: fakeRoutes, + RouteTablesClient: fakeTable, Config: Config{ RouteTableResourceGroup: "foo", RouteTableName: "bar", @@ -46,14 +47,30 @@ func TestDeleteRoute(t *testing.T) { unmanagedNodes: sets.NewString(), nodeInformerSynced: func() bool { return true }, } - route := cloudprovider.Route{TargetNode: "node", DestinationCIDR: "1.2.3.4/24"} + cache, _ := cloud.newRouteTableCache() + cloud.rtCache = cache + cloud.routeUpdater = newDelayedRouteUpdater(cloud, 100*time.Millisecond) + go cloud.routeUpdater.run() + route := cloudprovider.Route{ + TargetNode: "node", + DestinationCIDR: "1.2.3.4/24", + } routeName := mapNodeNameToRouteName(false, route.TargetNode, route.DestinationCIDR) - - fakeRoutes.FakeStore = map[string]map[string]network.Route{ - cloud.RouteTableName: { - routeName: {}, + routeTables := network.RouteTable{ + Name: &cloud.RouteTableName, + Location: &cloud.Location, + RouteTablePropertiesFormat: &network.RouteTablePropertiesFormat{ + Routes: &[]network.Route{ + { + Name: &routeName, + }, + }, }, } + fakeTable.FakeStore = map[string]map[string]network.RouteTable{} + fakeTable.FakeStore[cloud.RouteTableResourceGroup] = map[string]network.RouteTable{ + cloud.RouteTableName: routeTables, + } err := cloud.DeleteRoute(context.TODO(), "cluster", &route) if err != nil { @@ -61,15 +78,17 @@ func TestDeleteRoute(t *testing.T) { t.FailNow() } - mp, found := fakeRoutes.FakeStore[cloud.RouteTableName] + rt, found := fakeTable.FakeStore[cloud.RouteTableResourceGroup][cloud.RouteTableName] if !found { - t.Errorf("unexpected missing item for %s", cloud.RouteTableName) + t.Errorf("unexpected missing routetable for %s", cloud.RouteTableName) t.FailNow() } - ob, found := mp[routeName] - if found { - t.Errorf("unexpectedly found: %v that should have been deleted.", ob) - t.FailNow() + + for _, r := range *rt.Routes { + if to.String(r.Name) == routeName { + t.Errorf("unexpectedly found: %v that should have been deleted.", routeName) + t.FailNow() + } } // test delete route for unmanaged nodes. @@ -97,11 +116,9 @@ func TestDeleteRoute(t *testing.T) { func TestCreateRoute(t *testing.T) { fakeTable := newFakeRouteTablesClient() fakeVM := &fakeVMSet{} - fakeRoutes := newFakeRoutesClient() cloud := &Cloud{ RouteTablesClient: fakeTable, - RoutesClient: fakeRoutes, vmSet: fakeVM, Config: Config{ RouteTableResourceGroup: "foo", @@ -113,10 +130,13 @@ func TestCreateRoute(t *testing.T) { } cache, _ := cloud.newRouteTableCache() cloud.rtCache = cache + cloud.routeUpdater = newDelayedRouteUpdater(cloud, 100*time.Millisecond) + go cloud.routeUpdater.run() expectedTable := network.RouteTable{ - Name: &cloud.RouteTableName, - Location: &cloud.Location, + Name: &cloud.RouteTableName, + Location: &cloud.Location, + RouteTablePropertiesFormat: &network.RouteTablePropertiesFormat{}, } fakeTable.FakeStore = map[string]map[string]network.RouteTable{} fakeTable.FakeStore[cloud.RouteTableResourceGroup] = map[string]network.RouteTable{ @@ -134,45 +154,47 @@ func TestCreateRoute(t *testing.T) { t.Errorf("unexpected error create if not exists route table: %v", err) t.FailNow() } - if len(fakeTable.Calls) != 1 || fakeTable.Calls[0] != "Get" { + if len(fakeTable.Calls) != 2 { t.Errorf("unexpected calls create if not exists, exists: %v", fakeTable.Calls) } - if len(fakeRoutes.Calls) != 1 || fakeRoutes.Calls[0] != "CreateOrUpdate" { - t.Errorf("unexpected route calls create if not exists, exists: %v", fakeRoutes.Calls) - } routeName := mapNodeNameToRouteName(false, route.TargetNode, string(route.DestinationCIDR)) - routeInfo, found := fakeRoutes.FakeStore[cloud.RouteTableName][routeName] + rt, found := fakeTable.FakeStore[cloud.RouteTableResourceGroup][cloud.RouteTableName] if !found { - t.Errorf("could not find route: %v in %v", routeName, fakeRoutes.FakeStore) + t.Errorf("unexpected missing routetable for %s", cloud.RouteTableName) t.FailNow() } - if *routeInfo.AddressPrefix != route.DestinationCIDR { - t.Errorf("Expected cidr: %s, saw %s", *routeInfo.AddressPrefix, route.DestinationCIDR) + + foundRoute := false + for _, r := range *rt.Routes { + if to.String(r.Name) == routeName { + foundRoute = true + if *r.AddressPrefix != route.DestinationCIDR { + t.Errorf("Expected cidr: %s, saw %s", *r.AddressPrefix, route.DestinationCIDR) + } + if r.NextHopType != network.RouteNextHopTypeVirtualAppliance { + t.Errorf("Expected next hop: %v, saw %v", network.RouteNextHopTypeVirtualAppliance, r.NextHopType) + } + if *r.NextHopIPAddress != nodeIP { + t.Errorf("Expected IP address: %s, saw %s", nodeIP, *r.NextHopIPAddress) + } + + } } - if routeInfo.NextHopType != network.RouteNextHopTypeVirtualAppliance { - t.Errorf("Expected next hop: %v, saw %v", network.RouteNextHopTypeVirtualAppliance, routeInfo.NextHopType) - } - if *routeInfo.NextHopIPAddress != nodeIP { - t.Errorf("Expected IP address: %s, saw %s", nodeIP, *routeInfo.NextHopIPAddress) + if !foundRoute { + t.Errorf("could not find route: %v in %v", routeName, fakeTable.FakeStore) + t.FailNow() } - // test create again without real creation, clean fakeRoute calls - fakeRoutes.Calls = []string{} - routeInfo.Name = &routeName - route.Name = routeName - expectedTable.RouteTablePropertiesFormat = &network.RouteTablePropertiesFormat{ - Routes: &[]network.Route{routeInfo}, - } - cloud.rtCache.Set(cloud.RouteTableName, &expectedTable) - + // test create again without real creation, clean fakeTable calls + fakeTable.Calls = []string{} err = cloud.CreateRoute(context.TODO(), "cluster", "unused", &route) if err != nil { t.Errorf("unexpected error creating route: %v", err) t.FailNow() } - if len(fakeRoutes.Calls) != 0 { - t.Errorf("unexpected route calls create if not exists, exists: %v", fakeRoutes.Calls) + if len(fakeTable.Calls) != 1 || fakeTable.Calls[0] != "Get" { + t.Errorf("unexpected route calls create if not exists, exists: %v", fakeTable.Calls) } // test create route for unmanaged nodes. @@ -199,73 +221,6 @@ func TestCreateRoute(t *testing.T) { } } -func TestCreateRouteTableIfNotExists_Exists(t *testing.T) { - fake := newFakeRouteTablesClient() - cloud := &Cloud{ - RouteTablesClient: fake, - Config: Config{ - RouteTableResourceGroup: "foo", - RouteTableName: "bar", - Location: "location", - }, - } - cache, _ := cloud.newRouteTableCache() - cloud.rtCache = cache - - expectedTable := network.RouteTable{ - Name: &cloud.RouteTableName, - Location: &cloud.Location, - } - fake.FakeStore = map[string]map[string]network.RouteTable{} - fake.FakeStore[cloud.RouteTableResourceGroup] = map[string]network.RouteTable{ - cloud.RouteTableName: expectedTable, - } - err := cloud.createRouteTableIfNotExists("clusterName", &cloudprovider.Route{TargetNode: "node", DestinationCIDR: "1.2.3.4/16"}) - if err != nil { - t.Errorf("unexpected error create if not exists route table: %v", err) - t.FailNow() - } - if len(fake.Calls) != 1 || fake.Calls[0] != "Get" { - t.Errorf("unexpected calls create if not exists, exists: %v", fake.Calls) - } -} - -func TestCreateRouteTableIfNotExists_NotExists(t *testing.T) { - fake := newFakeRouteTablesClient() - cloud := &Cloud{ - RouteTablesClient: fake, - Config: Config{ - RouteTableResourceGroup: "foo", - RouteTableName: "bar", - Location: "location", - }, - } - cache, _ := cloud.newRouteTableCache() - cloud.rtCache = cache - - expectedTable := network.RouteTable{ - Name: &cloud.RouteTableName, - Location: &cloud.Location, - } - - err := cloud.createRouteTableIfNotExists("clusterName", &cloudprovider.Route{TargetNode: "node", DestinationCIDR: "1.2.3.4/16"}) - if err != nil { - t.Errorf("unexpected error create if not exists route table: %v", err) - t.FailNow() - } - - table := fake.FakeStore[cloud.RouteTableResourceGroup][cloud.RouteTableName] - if *table.Location != *expectedTable.Location { - t.Errorf("mismatch: %s vs %s", *table.Location, *expectedTable.Location) - } - if *table.Name != *expectedTable.Name { - t.Errorf("mismatch: %s vs %s", *table.Name, *expectedTable.Name) - } - if len(fake.Calls) != 2 || fake.Calls[0] != "Get" || fake.Calls[1] != "CreateOrUpdate" { - t.Errorf("unexpected calls create if not exists, exists: %v", fake.Calls) - } -} - func TestCreateRouteTable(t *testing.T) { fake := newFakeRouteTablesClient() cloud := &Cloud{