Merge pull request #88209 from feiskyer/fix-route-updates

Fix route conflicted operations when updating multiple routes together
This commit is contained in:
Kubernetes Prow Robot 2020-02-16 22:41:28 -08:00 committed by GitHub
commit 3ed0f1bec1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 245 additions and 136 deletions

View File

@ -272,6 +272,7 @@ type Cloud struct {
kubeClient clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
routeUpdater *delayedRouteUpdater
vmCache *timedCache
lbCache *timedCache
@ -552,6 +553,10 @@ func (az *Cloud) InitializeCloudFromConfig(config *Config, fromSecret bool) erro
return err
}
// start delayed route updater.
az.routeUpdater = newDelayedRouteUpdater(az, routeUpdateInterval)
go az.routeUpdater.run()
return nil
}

View File

@ -22,6 +22,8 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/Azure/go-autorest/autorest/to"
@ -32,13 +34,169 @@ import (
utilnet "k8s.io/utils/net"
)
var (
// routeUpdateInterval defines the route reconciling interval.
routeUpdateInterval = 30 * time.Second
)
// routeOperation defines the allowed operations for route updating.
type routeOperation string
// copied to minimize the number of cross reference
// and exceptions in publishing and allowed imports.
const (
routeNameFmt = "%s____%s"
routeNameSeparator = "____"
// Route operations.
routeOperationAdd routeOperation = "add"
routeOperationDelete routeOperation = "delete"
)
// delayedRouteOperation defines a delayed route operation which is used in delayedRouteUpdater.
type delayedRouteOperation struct {
route network.Route
operation routeOperation
result chan error
}
// wait waits for the operation completion and returns the result.
func (op *delayedRouteOperation) wait() error {
return <-op.result
}
// delayedRouteUpdater defines a delayed route updater, which batches all the
// route updating operations within "interval" period.
// Example usage:
// op, err := updater.addRouteOperation(routeOperationAdd, route)
// err = op.wait()
type delayedRouteUpdater struct {
az *Cloud
interval time.Duration
lock sync.Mutex
routesToUpdate []*delayedRouteOperation
}
// newDelayedRouteUpdater creates a new delayedRouteUpdater.
func newDelayedRouteUpdater(az *Cloud, interval time.Duration) *delayedRouteUpdater {
return &delayedRouteUpdater{
az: az,
interval: interval,
routesToUpdate: make([]*delayedRouteOperation, 0),
}
}
// run starts the updater reconciling loop.
func (d *delayedRouteUpdater) run() {
for {
d.updateRoutes()
time.Sleep(d.interval)
}
}
// updateRoutes invokes route table client to update all routes.
func (d *delayedRouteUpdater) updateRoutes() {
d.lock.Lock()
defer d.lock.Unlock()
// No need to do any updating.
if len(d.routesToUpdate) == 0 {
return
}
var err error
defer func() {
// Notify all the goroutines.
for _, rt := range d.routesToUpdate {
rt.result <- err
}
// Clear all the jobs.
d.routesToUpdate = make([]*delayedRouteOperation, 0)
}()
var routeTable network.RouteTable
var existsRouteTable bool
routeTable, existsRouteTable, err = d.az.getRouteTable(cacheReadTypeDefault)
if err != nil {
klog.Errorf("getRouteTable() failed with error: %v", err)
return
}
// create route table if it doesn't exists yet.
if !existsRouteTable {
err = d.az.createRouteTable()
if err != nil {
klog.Errorf("createRouteTable() failed with error: %v", err)
return
}
routeTable, _, err = d.az.getRouteTable(cacheReadTypeDefault)
if err != nil {
klog.Errorf("getRouteTable() failed with error: %v", err)
return
}
}
// reconcile routes.
dirty := false
routes := []network.Route{}
if routeTable.Routes != nil {
routes = *routeTable.Routes
}
for _, rt := range d.routesToUpdate {
routeMatch := false
for i, existingRoute := range routes {
if strings.EqualFold(to.String(existingRoute.Name), to.String(rt.route.Name)) {
// delete the name-matched routes here (missing routes would be added later if the operation is add).
routes = append(routes[:i], routes[i+1:]...)
if existingRoute.RoutePropertiesFormat != nil &&
rt.route.RoutePropertiesFormat != nil &&
strings.EqualFold(to.String(existingRoute.AddressPrefix), to.String(rt.route.AddressPrefix)) &&
strings.EqualFold(to.String(existingRoute.NextHopIPAddress), to.String(rt.route.NextHopIPAddress)) {
routeMatch = true
}
if rt.operation == routeOperationDelete {
dirty = true
}
break
}
}
// Add missing routes if the operation is add.
if rt.operation == routeOperationAdd {
routes = append(routes, rt.route)
if !routeMatch {
dirty = true
}
continue
}
}
if dirty {
routeTable.Routes = &routes
err = d.az.CreateOrUpdateRouteTable(routeTable)
if err != nil {
klog.Errorf("CreateOrUpdateRouteTable() failed with error: %v", err)
return
}
}
}
// addRouteOperation adds the routeOperation to delayedRouteUpdater and returns a delayedRouteOperation.
func (d *delayedRouteUpdater) addRouteOperation(operation routeOperation, route network.Route) (*delayedRouteOperation, error) {
d.lock.Lock()
defer d.lock.Unlock()
op := &delayedRouteOperation{
route: route,
operation: operation,
result: make(chan error),
}
d.routesToUpdate = append(d.routesToUpdate, op)
return op, nil
}
// ListRoutes lists all managed routes that belong to the specified clusterName
func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
klog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName)
@ -97,16 +255,6 @@ func processRoutes(ipv6DualStackEnabled bool, routeTable network.RouteTable, exi
return kubeRoutes, nil
}
func (az *Cloud) createRouteTableIfNotExists(clusterName string, kubeRoute *cloudprovider.Route) error {
if _, existsRouteTable, err := az.getRouteTable(cacheReadTypeDefault); err != nil {
klog.V(2).Infof("createRouteTableIfNotExists error: couldn't get routetable. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
return err
} else if existsRouteTable {
return nil
}
return az.createRouteTable()
}
func (az *Cloud) createRouteTable() error {
routeTable := network.RouteTable{
Name: to.StringPtr(az.RouteTableName),
@ -148,10 +296,6 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s
return nil
}
klog.V(2).Infof("CreateRoute: creating route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
if err := az.createRouteTableIfNotExists(clusterName, kubeRoute); err != nil {
return err
}
if !az.ipv6DualStackEnabled {
targetIP, _, err = az.getIPForMachine(kubeRoute.TargetNode)
if err != nil {
@ -184,24 +328,17 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s
},
}
actualRoutes, err := az.ListRoutes(ctx, clusterName)
klog.V(2).Infof("CreateRoute: creating route for clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
op, err := az.routeUpdater.addRouteOperation(routeOperationAdd, route)
if err != nil {
klog.V(3).Infof("CreateRoute: creating route: failed(ListRoutes) clusterName= %q instance=%q with error=%v", clusterName, kubeRoute.TargetNode, err)
klog.Errorf("CreateRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}
for _, actualRoute := range actualRoutes {
if strings.EqualFold(actualRoute.Name, kubeRoute.Name) &&
strings.EqualFold(string(actualRoute.TargetNode), string(kubeRoute.TargetNode)) &&
strings.EqualFold(actualRoute.DestinationCIDR, kubeRoute.DestinationCIDR) {
klog.V(2).Infof("CreateRoute: route is already existed and matched, no need to re-create or update")
return nil
}
}
klog.V(3).Infof("CreateRoute: creating route: instance=%q cidr=%q", kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
err = az.CreateOrUpdateRoute(route)
// Wait for operation complete.
err = op.wait()
if err != nil {
klog.Errorf("CreateRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}
@ -229,8 +366,20 @@ func (az *Cloud) DeleteRoute(ctx context.Context, clusterName string, kubeRoute
klog.V(2).Infof("DeleteRoute: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
routeName := mapNodeNameToRouteName(az.ipv6DualStackEnabled, kubeRoute.TargetNode, string(kubeRoute.DestinationCIDR))
err = az.DeleteRouteWithName(routeName)
route := network.Route{
Name: to.StringPtr(routeName),
RoutePropertiesFormat: &network.RoutePropertiesFormat{},
}
op, err := az.routeUpdater.addRouteOperation(routeOperationDelete, route)
if err != nil {
klog.Errorf("DeleteRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}
// Wait for operation complete.
err = op.wait()
if err != nil {
klog.Errorf("DeleteRoute failed for node %q with error: %v", kubeRoute.TargetNode, err)
return err
}

View File

@ -23,6 +23,7 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network"
"github.com/Azure/go-autorest/autorest/to"
@ -34,10 +35,10 @@ import (
)
func TestDeleteRoute(t *testing.T) {
fakeRoutes := newFakeRoutesClient()
fakeTable := newFakeRouteTablesClient()
cloud := &Cloud{
RoutesClient: fakeRoutes,
RouteTablesClient: fakeTable,
Config: Config{
RouteTableResourceGroup: "foo",
RouteTableName: "bar",
@ -46,14 +47,30 @@ func TestDeleteRoute(t *testing.T) {
unmanagedNodes: sets.NewString(),
nodeInformerSynced: func() bool { return true },
}
route := cloudprovider.Route{TargetNode: "node", DestinationCIDR: "1.2.3.4/24"}
cache, _ := cloud.newRouteTableCache()
cloud.rtCache = cache
cloud.routeUpdater = newDelayedRouteUpdater(cloud, 100*time.Millisecond)
go cloud.routeUpdater.run()
route := cloudprovider.Route{
TargetNode: "node",
DestinationCIDR: "1.2.3.4/24",
}
routeName := mapNodeNameToRouteName(false, route.TargetNode, route.DestinationCIDR)
fakeRoutes.FakeStore = map[string]map[string]network.Route{
cloud.RouteTableName: {
routeName: {},
routeTables := network.RouteTable{
Name: &cloud.RouteTableName,
Location: &cloud.Location,
RouteTablePropertiesFormat: &network.RouteTablePropertiesFormat{
Routes: &[]network.Route{
{
Name: &routeName,
},
},
},
}
fakeTable.FakeStore = map[string]map[string]network.RouteTable{}
fakeTable.FakeStore[cloud.RouteTableResourceGroup] = map[string]network.RouteTable{
cloud.RouteTableName: routeTables,
}
err := cloud.DeleteRoute(context.TODO(), "cluster", &route)
if err != nil {
@ -61,15 +78,17 @@ func TestDeleteRoute(t *testing.T) {
t.FailNow()
}
mp, found := fakeRoutes.FakeStore[cloud.RouteTableName]
rt, found := fakeTable.FakeStore[cloud.RouteTableResourceGroup][cloud.RouteTableName]
if !found {
t.Errorf("unexpected missing item for %s", cloud.RouteTableName)
t.Errorf("unexpected missing routetable for %s", cloud.RouteTableName)
t.FailNow()
}
ob, found := mp[routeName]
if found {
t.Errorf("unexpectedly found: %v that should have been deleted.", ob)
t.FailNow()
for _, r := range *rt.Routes {
if to.String(r.Name) == routeName {
t.Errorf("unexpectedly found: %v that should have been deleted.", routeName)
t.FailNow()
}
}
// test delete route for unmanaged nodes.
@ -97,11 +116,9 @@ func TestDeleteRoute(t *testing.T) {
func TestCreateRoute(t *testing.T) {
fakeTable := newFakeRouteTablesClient()
fakeVM := &fakeVMSet{}
fakeRoutes := newFakeRoutesClient()
cloud := &Cloud{
RouteTablesClient: fakeTable,
RoutesClient: fakeRoutes,
vmSet: fakeVM,
Config: Config{
RouteTableResourceGroup: "foo",
@ -113,10 +130,13 @@ func TestCreateRoute(t *testing.T) {
}
cache, _ := cloud.newRouteTableCache()
cloud.rtCache = cache
cloud.routeUpdater = newDelayedRouteUpdater(cloud, 100*time.Millisecond)
go cloud.routeUpdater.run()
expectedTable := network.RouteTable{
Name: &cloud.RouteTableName,
Location: &cloud.Location,
Name: &cloud.RouteTableName,
Location: &cloud.Location,
RouteTablePropertiesFormat: &network.RouteTablePropertiesFormat{},
}
fakeTable.FakeStore = map[string]map[string]network.RouteTable{}
fakeTable.FakeStore[cloud.RouteTableResourceGroup] = map[string]network.RouteTable{
@ -134,45 +154,47 @@ func TestCreateRoute(t *testing.T) {
t.Errorf("unexpected error create if not exists route table: %v", err)
t.FailNow()
}
if len(fakeTable.Calls) != 1 || fakeTable.Calls[0] != "Get" {
if len(fakeTable.Calls) != 2 {
t.Errorf("unexpected calls create if not exists, exists: %v", fakeTable.Calls)
}
if len(fakeRoutes.Calls) != 1 || fakeRoutes.Calls[0] != "CreateOrUpdate" {
t.Errorf("unexpected route calls create if not exists, exists: %v", fakeRoutes.Calls)
}
routeName := mapNodeNameToRouteName(false, route.TargetNode, string(route.DestinationCIDR))
routeInfo, found := fakeRoutes.FakeStore[cloud.RouteTableName][routeName]
rt, found := fakeTable.FakeStore[cloud.RouteTableResourceGroup][cloud.RouteTableName]
if !found {
t.Errorf("could not find route: %v in %v", routeName, fakeRoutes.FakeStore)
t.Errorf("unexpected missing routetable for %s", cloud.RouteTableName)
t.FailNow()
}
if *routeInfo.AddressPrefix != route.DestinationCIDR {
t.Errorf("Expected cidr: %s, saw %s", *routeInfo.AddressPrefix, route.DestinationCIDR)
foundRoute := false
for _, r := range *rt.Routes {
if to.String(r.Name) == routeName {
foundRoute = true
if *r.AddressPrefix != route.DestinationCIDR {
t.Errorf("Expected cidr: %s, saw %s", *r.AddressPrefix, route.DestinationCIDR)
}
if r.NextHopType != network.RouteNextHopTypeVirtualAppliance {
t.Errorf("Expected next hop: %v, saw %v", network.RouteNextHopTypeVirtualAppliance, r.NextHopType)
}
if *r.NextHopIPAddress != nodeIP {
t.Errorf("Expected IP address: %s, saw %s", nodeIP, *r.NextHopIPAddress)
}
}
}
if routeInfo.NextHopType != network.RouteNextHopTypeVirtualAppliance {
t.Errorf("Expected next hop: %v, saw %v", network.RouteNextHopTypeVirtualAppliance, routeInfo.NextHopType)
}
if *routeInfo.NextHopIPAddress != nodeIP {
t.Errorf("Expected IP address: %s, saw %s", nodeIP, *routeInfo.NextHopIPAddress)
if !foundRoute {
t.Errorf("could not find route: %v in %v", routeName, fakeTable.FakeStore)
t.FailNow()
}
// test create again without real creation, clean fakeRoute calls
fakeRoutes.Calls = []string{}
routeInfo.Name = &routeName
route.Name = routeName
expectedTable.RouteTablePropertiesFormat = &network.RouteTablePropertiesFormat{
Routes: &[]network.Route{routeInfo},
}
cloud.rtCache.Set(cloud.RouteTableName, &expectedTable)
// test create again without real creation, clean fakeTable calls
fakeTable.Calls = []string{}
err = cloud.CreateRoute(context.TODO(), "cluster", "unused", &route)
if err != nil {
t.Errorf("unexpected error creating route: %v", err)
t.FailNow()
}
if len(fakeRoutes.Calls) != 0 {
t.Errorf("unexpected route calls create if not exists, exists: %v", fakeRoutes.Calls)
if len(fakeTable.Calls) != 1 || fakeTable.Calls[0] != "Get" {
t.Errorf("unexpected route calls create if not exists, exists: %v", fakeTable.Calls)
}
// test create route for unmanaged nodes.
@ -199,73 +221,6 @@ func TestCreateRoute(t *testing.T) {
}
}
func TestCreateRouteTableIfNotExists_Exists(t *testing.T) {
fake := newFakeRouteTablesClient()
cloud := &Cloud{
RouteTablesClient: fake,
Config: Config{
RouteTableResourceGroup: "foo",
RouteTableName: "bar",
Location: "location",
},
}
cache, _ := cloud.newRouteTableCache()
cloud.rtCache = cache
expectedTable := network.RouteTable{
Name: &cloud.RouteTableName,
Location: &cloud.Location,
}
fake.FakeStore = map[string]map[string]network.RouteTable{}
fake.FakeStore[cloud.RouteTableResourceGroup] = map[string]network.RouteTable{
cloud.RouteTableName: expectedTable,
}
err := cloud.createRouteTableIfNotExists("clusterName", &cloudprovider.Route{TargetNode: "node", DestinationCIDR: "1.2.3.4/16"})
if err != nil {
t.Errorf("unexpected error create if not exists route table: %v", err)
t.FailNow()
}
if len(fake.Calls) != 1 || fake.Calls[0] != "Get" {
t.Errorf("unexpected calls create if not exists, exists: %v", fake.Calls)
}
}
func TestCreateRouteTableIfNotExists_NotExists(t *testing.T) {
fake := newFakeRouteTablesClient()
cloud := &Cloud{
RouteTablesClient: fake,
Config: Config{
RouteTableResourceGroup: "foo",
RouteTableName: "bar",
Location: "location",
},
}
cache, _ := cloud.newRouteTableCache()
cloud.rtCache = cache
expectedTable := network.RouteTable{
Name: &cloud.RouteTableName,
Location: &cloud.Location,
}
err := cloud.createRouteTableIfNotExists("clusterName", &cloudprovider.Route{TargetNode: "node", DestinationCIDR: "1.2.3.4/16"})
if err != nil {
t.Errorf("unexpected error create if not exists route table: %v", err)
t.FailNow()
}
table := fake.FakeStore[cloud.RouteTableResourceGroup][cloud.RouteTableName]
if *table.Location != *expectedTable.Location {
t.Errorf("mismatch: %s vs %s", *table.Location, *expectedTable.Location)
}
if *table.Name != *expectedTable.Name {
t.Errorf("mismatch: %s vs %s", *table.Name, *expectedTable.Name)
}
if len(fake.Calls) != 2 || fake.Calls[0] != "Get" || fake.Calls[1] != "CreateOrUpdate" {
t.Errorf("unexpected calls create if not exists, exists: %v", fake.Calls)
}
}
func TestCreateRouteTable(t *testing.T) {
fake := newFakeRouteTablesClient()
cloud := &Cloud{