From 19d7d85a67ec56210f20918dcc2bebe615dbf08c Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Tue, 28 Aug 2018 13:14:01 +0800 Subject: [PATCH 1/3] Add on-prem nodes support to Azure cloud provider On-prem nodes should register themselves with required labels, e.g. kubelet --node-labels=alpha.service-controller.kubernetes.io/exclude-balancer=true,kubernetes.azure.com/managed=false ... --- .../providers/azure/azure_instances.go | 47 +++++++++++++++++++ .../providers/azure/azure_routes.go | 20 ++++++++ .../providers/azure/azure_standard.go | 2 +- .../providers/azure/azure_wrap.go | 21 +++++++++ .../providers/azure/azure_zones.go | 16 +++++++ 5 files changed, 105 insertions(+), 1 deletion(-) diff --git a/pkg/cloudprovider/providers/azure/azure_instances.go b/pkg/cloudprovider/providers/azure/azure_instances.go index e2d5b3cb754..7eb403a7b29 100644 --- a/pkg/cloudprovider/providers/azure/azure_instances.go +++ b/pkg/cloudprovider/providers/azure/azure_instances.go @@ -30,6 +30,16 @@ import ( // NodeAddresses returns the addresses of the specified instance. func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) { + // Returns nil for unmanaged nodes because azure cloud provider couldn't fetch information for them. + unmanaged, err := az.IsNodeUnmanaged(string(name)) + if err != nil { + return nil, err + } + if unmanaged { + glog.V(4).Infof("NodeAddresses: omitting unmanaged node %q", name) + return nil, nil + } + addressGetter := func(nodeName types.NodeName) ([]v1.NodeAddress, error) { ip, publicIP, err := az.GetIPForMachineWithRetry(nodeName) if err != nil { @@ -92,6 +102,12 @@ func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.N // This method will not be called from the node that is requesting this ID. i.e. metadata service // and other local methods cannot be used here func (az *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) { + // Returns nil for unmanaged nodes because azure cloud provider couldn't fetch information for them. + if az.IsNodeUnmanagedByProviderID(providerID) { + glog.V(4).Infof("NodeAddressesByProviderID: omitting unmanaged node %q", providerID) + return nil, nil + } + name, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return nil, err @@ -103,6 +119,12 @@ func (az *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID strin // InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running. // If false is returned with no error, the instance will be immediately deleted by the cloud controller manager. func (az *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) { + // Returns true for unmanaged nodes because azure cloud provider always assumes them exists. + if az.IsNodeUnmanagedByProviderID(providerID) { + glog.V(4).Infof("InstanceExistsByProviderID: assuming unmanaged node %q exists", providerID) + return true, nil + } + name, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return false, err @@ -154,6 +176,15 @@ func (az *Cloud) isCurrentInstance(name types.NodeName, metadataVMName string) ( // Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound) func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, error) { nodeName := mapNodeNameToVMName(name) + unmanaged, err := az.IsNodeUnmanaged(nodeName) + if err != nil { + return "", err + } + if unmanaged { + // InstanceID is same with nodeName for unmanaged nodes. + glog.V(4).Infof("InstanceID: getting ID %q for unmanaged node %q", name, name) + return nodeName, nil + } if az.UseInstanceMetadata { computeMetadata, err := az.getComputeMetadata() @@ -202,6 +233,12 @@ func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, e // This method will not be called from the node that is requesting this ID. i.e. metadata service // and other local methods cannot be used here func (az *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) { + // Returns "" for unmanaged nodes because azure cloud provider couldn't fetch information for them. + if az.IsNodeUnmanagedByProviderID(providerID) { + glog.V(4).Infof("InstanceTypeByProviderID: omitting unmanaged node %q", providerID) + return "", nil + } + name, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return "", err @@ -215,6 +252,16 @@ func (az *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string // (Implementer Note): This is used by kubelet. Kubelet will label the node. Real log from kubelet: // Adding node label from cloud provider: beta.kubernetes.io/instance-type=[value] func (az *Cloud) InstanceType(ctx context.Context, name types.NodeName) (string, error) { + // Returns "" for unmanaged nodes because azure cloud provider couldn't fetch information for them. + unmanaged, err := az.IsNodeUnmanaged(string(name)) + if err != nil { + return "", err + } + if unmanaged { + glog.V(4).Infof("InstanceType: omitting unmanaged node %q", name) + return "", nil + } + if az.UseInstanceMetadata { computeMetadata, err := az.getComputeMetadata() if err != nil { diff --git a/pkg/cloudprovider/providers/azure/azure_routes.go b/pkg/cloudprovider/providers/azure/azure_routes.go index 7f627a98a1d..20a49064cb5 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes.go +++ b/pkg/cloudprovider/providers/azure/azure_routes.go @@ -107,6 +107,16 @@ func (az *Cloud) createRouteTable() error { // route.Name will be ignored, although the cloud-provider may use nameHint // to create a more user-meaningful name. func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, kubeRoute *cloudprovider.Route) error { + // Returns for unmanaged nodes because azure cloud provider couldn't fetch information for them. + unmanaged, err := az.IsNodeUnmanaged(string(kubeRoute.TargetNode)) + if err != nil { + return err + } + if unmanaged { + glog.V(2).Infof("CreateRoute: omitting unmanaged node %q", kubeRoute.TargetNode) + return nil + } + glog.V(2).Infof("CreateRoute: creating route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) if err := az.createRouteTableIfNotExists(clusterName, kubeRoute); err != nil { return err @@ -150,6 +160,16 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s // DeleteRoute deletes the specified managed route // Route should be as returned by ListRoutes func (az *Cloud) DeleteRoute(ctx context.Context, clusterName string, kubeRoute *cloudprovider.Route) error { + // Returns for unmanaged nodes because azure cloud provider couldn't fetch information for them. + unmanaged, err := az.IsNodeUnmanaged(string(kubeRoute.TargetNode)) + if err != nil { + return err + } + if unmanaged { + glog.V(2).Infof("DeleteRoute: omitting unmanaged node %q", kubeRoute.TargetNode) + return nil + } + glog.V(2).Infof("DeleteRoute: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR) ctx, cancel := getContextWithCancel() diff --git a/pkg/cloudprovider/providers/azure/azure_standard.go b/pkg/cloudprovider/providers/azure/azure_standard.go index 8fec048c256..f5b39274c22 100644 --- a/pkg/cloudprovider/providers/azure/azure_standard.go +++ b/pkg/cloudprovider/providers/azure/azure_standard.go @@ -625,7 +625,7 @@ func (as *availabilitySet) ensureHostInPool(serviceName string, nodeName types.N } if nic.ProvisioningState != nil && *nic.ProvisioningState == nicFailedState { - glog.V(3).Infof("ensureHostInPool skips node %s because its primdary nic %s is in Failed state", nodeName, nic.Name) + glog.V(3).Infof("ensureHostInPool skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name) return nil } diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go index f3d2e642805..86bb87325cc 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -19,6 +19,7 @@ package azure import ( "fmt" "net/http" + "regexp" "strings" "time" @@ -36,6 +37,8 @@ var ( lbCacheTTL = 2 * time.Minute nsgCacheTTL = 2 * time.Minute rtCacheTTL = 2 * time.Minute + + azureNodeProviderIDRE = regexp.MustCompile(`^azure:///subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Compute/(?:.*)`) ) // checkExistsFromError inspects an error and returns a true if err is nil, @@ -283,3 +286,21 @@ func (az *Cloud) useStandardLoadBalancer() bool { func (az *Cloud) excludeMasterNodesFromStandardLB() bool { return az.ExcludeMasterFromStandardLB != nil && *az.ExcludeMasterFromStandardLB } + +// IsNodeUnmanaged returns true if the node is not managed by Azure cloud provider. +// Those nodes includes on-prem or VMs from other clouds. They will not be added to load balancer +// backends. Azure routes and managed disks are also not supported for them. +func (az *Cloud) IsNodeUnmanaged(nodeName string) (bool, error) { + unmanagedNodes, err := az.GetUnmanagedNodes() + if err != nil { + return false, err + } + + return unmanagedNodes.Has(nodeName), nil +} + +// IsNodeUnmanagedByProviderID returns true if the node is not managed by Azure cloud provider. +// All managed node's providerIDs are in format 'azure:///subscriptions//resourceGroups//providers/Microsoft.Compute/.*' +func (az *Cloud) IsNodeUnmanagedByProviderID(providerID string) bool { + return azureNodeProviderIDRE.Match([]byte(providerID)) +} diff --git a/pkg/cloudprovider/providers/azure/azure_zones.go b/pkg/cloudprovider/providers/azure/azure_zones.go index 0d4195acc68..08aab6d07e4 100644 --- a/pkg/cloudprovider/providers/azure/azure_zones.go +++ b/pkg/cloudprovider/providers/azure/azure_zones.go @@ -103,6 +103,12 @@ func (az *Cloud) getZoneFromFaultDomain() (cloudprovider.Zone, error) { // This is particularly useful in external cloud providers where the kubelet // does not initialize node data. func (az *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) { + // Returns nil for unmanaged nodes because azure cloud provider couldn't fetch information for them. + if az.IsNodeUnmanagedByProviderID(providerID) { + glog.V(2).Infof("GetZoneByProviderID: omitting unmanaged node %q", providerID) + return cloudprovider.Zone{}, nil + } + nodeName, err := az.vmSet.GetNodeNameByProviderID(providerID) if err != nil { return cloudprovider.Zone{}, err @@ -115,6 +121,16 @@ func (az *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (cl // This is particularly useful in external cloud providers where the kubelet // does not initialize node data. func (az *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) { + // Returns "" for unmanaged nodes because azure cloud provider couldn't fetch information for them. + unmanaged, err := az.IsNodeUnmanaged(string(nodeName)) + if err != nil { + return cloudprovider.Zone{}, err + } + if unmanaged { + glog.V(2).Infof("GetZoneByNodeName: omitting unmanaged node %q", nodeName) + return cloudprovider.Zone{}, nil + } + return az.vmSet.GetZoneByNodeName(string(nodeName)) } From 919058b315e07694e6fdb0fc7b38d88eb23e2baf Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Tue, 28 Aug 2018 14:40:17 +0800 Subject: [PATCH 2/3] Compose routes for on-prem nodes Compose faked routes for unmanaged nodes so that node controller would assume the routes for them have already been created. --- pkg/cloudprovider/providers/azure/azure.go | 8 ++++- .../providers/azure/azure_routes.go | 36 +++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index bd27adf2611..5263773e59e 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -159,7 +159,7 @@ type Cloud struct { metadata *InstanceMetadata vmSet VMSet - // Lock for access to node caches + // Lock for access to node caches, includes nodeZones, nodeResourceGroups, and unmanagedNodes. nodeCachesLock sync.Mutex // nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone // it is updated by the nodeInformer @@ -171,6 +171,11 @@ type Cloud struct { // nodeInformerSynced is for determining if the informer has synced. nodeInformerSynced cache.InformerSynced + // routeCIDRsLock holds lock for routeCIDRs cache. + routeCIDRsLock sync.Mutex + // routeCIDRs holds cache for route CIDRs. + routeCIDRs map[string]string + // Clients for vmss. VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient @@ -270,6 +275,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { nodeZones: map[string]sets.String{}, nodeResourceGroups: map[string]string{}, unmanagedNodes: sets.NewString(), + routeCIDRs: map[string]string{}, DisksClient: newAzDisksClient(azClientConfig), RoutesClient: newAzRoutesClient(azClientConfig), diff --git a/pkg/cloudprovider/providers/azure/azure_routes.go b/pkg/cloudprovider/providers/azure/azure_routes.go index 20a49064cb5..467d86b7839 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes.go +++ b/pkg/cloudprovider/providers/azure/azure_routes.go @@ -32,7 +32,29 @@ import ( func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { glog.V(10).Infof("ListRoutes: START clusterName=%q", clusterName) routeTable, existsRouteTable, err := az.getRouteTable() - return processRoutes(routeTable, existsRouteTable, err) + routes, err := processRoutes(routeTable, existsRouteTable, err) + if err != nil { + return nil, err + } + + // Compose routes for unmanaged routes so that node controller won't retry creating routes for them. + unmanagedNodes, err := az.GetUnmanagedNodes() + if err != nil { + return nil, err + } + az.routeCIDRsLock.Lock() + defer az.routeCIDRsLock.Unlock() + for _, nodeName := range unmanagedNodes.List() { + if cidr, ok := az.routeCIDRs[nodeName]; ok { + routes = append(routes, &cloudprovider.Route{ + Name: nodeName, + TargetNode: mapRouteNameToNodeName(nodeName), + DestinationCIDR: cidr, + }) + } + } + + return routes, nil } // Injectable for testing @@ -108,12 +130,16 @@ func (az *Cloud) createRouteTable() error { // to create a more user-meaningful name. func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, kubeRoute *cloudprovider.Route) error { // Returns for unmanaged nodes because azure cloud provider couldn't fetch information for them. - unmanaged, err := az.IsNodeUnmanaged(string(kubeRoute.TargetNode)) + nodeName := string(kubeRoute.TargetNode) + unmanaged, err := az.IsNodeUnmanaged(nodeName) if err != nil { return err } if unmanaged { glog.V(2).Infof("CreateRoute: omitting unmanaged node %q", kubeRoute.TargetNode) + az.routeCIDRsLock.Lock() + defer az.routeCIDRsLock.Unlock() + az.routeCIDRs[nodeName] = kubeRoute.DestinationCIDR return nil } @@ -161,12 +187,16 @@ func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint s // Route should be as returned by ListRoutes func (az *Cloud) DeleteRoute(ctx context.Context, clusterName string, kubeRoute *cloudprovider.Route) error { // Returns for unmanaged nodes because azure cloud provider couldn't fetch information for them. - unmanaged, err := az.IsNodeUnmanaged(string(kubeRoute.TargetNode)) + nodeName := string(kubeRoute.TargetNode) + unmanaged, err := az.IsNodeUnmanaged(nodeName) if err != nil { return err } if unmanaged { glog.V(2).Infof("DeleteRoute: omitting unmanaged node %q", kubeRoute.TargetNode) + az.routeCIDRsLock.Lock() + defer az.routeCIDRsLock.Unlock() + delete(az.routeCIDRs, nodeName) return nil } From 9bbd5043ead68b4708df466a95077417153c4d4d Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Wed, 29 Aug 2018 14:19:36 +0800 Subject: [PATCH 3/3] Add unit tests --- .../providers/azure/azure_routes_test.go | 50 +++++++++++++++++ .../providers/azure/azure_test.go | 1 + .../providers/azure/azure_wrap_test.go | 56 +++++++++++++++++++ 3 files changed, 107 insertions(+) diff --git a/pkg/cloudprovider/providers/azure/azure_routes_test.go b/pkg/cloudprovider/providers/azure/azure_routes_test.go index cc248ee52e6..1d251da63b2 100644 --- a/pkg/cloudprovider/providers/azure/azure_routes_test.go +++ b/pkg/cloudprovider/providers/azure/azure_routes_test.go @@ -22,6 +22,7 @@ import ( "reflect" "testing" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/cloudprovider" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network" @@ -38,6 +39,8 @@ func TestDeleteRoute(t *testing.T) { RouteTableName: "bar", Location: "location", }, + unmanagedNodes: sets.NewString(), + nodeInformerSynced: func() bool { return true }, } route := cloudprovider.Route{TargetNode: "node", DestinationCIDR: "1.2.3.4/24"} routeName := mapNodeNameToRouteName(route.TargetNode) @@ -62,6 +65,28 @@ func TestDeleteRoute(t *testing.T) { ob, found := mp[routeName] if found { t.Errorf("unexpectedly found: %v that should have been deleted.", ob) + t.FailNow() + } + + // test delete route for unmanaged nodes. + nodeName := "node1" + nodeCIDR := "4.3.2.1/24" + cloud.unmanagedNodes.Insert(nodeName) + cloud.routeCIDRs = map[string]string{ + nodeName: nodeCIDR, + } + route1 := cloudprovider.Route{ + TargetNode: mapRouteNameToNodeName(nodeName), + DestinationCIDR: nodeCIDR, + } + err = cloud.DeleteRoute(context.TODO(), "cluster", &route1) + if err != nil { + t.Errorf("unexpected error deleting route: %v", err) + t.FailNow() + } + cidr, found := cloud.routeCIDRs[nodeName] + if found { + t.Errorf("unexpected CIDR item (%q) for %s", cidr, nodeName) } } @@ -79,6 +104,8 @@ func TestCreateRoute(t *testing.T) { RouteTableName: "bar", Location: "location", }, + unmanagedNodes: sets.NewString(), + nodeInformerSynced: func() bool { return true }, } cache, _ := cloud.newRouteTableCache() cloud.rtCache = cache @@ -122,6 +149,29 @@ func TestCreateRoute(t *testing.T) { if *routeInfo.NextHopIPAddress != nodeIP { t.Errorf("Expected IP address: %s, saw %s", nodeIP, *routeInfo.NextHopIPAddress) } + + // test create route for unmanaged nodes. + nodeName := "node1" + nodeCIDR := "4.3.2.1/24" + cloud.unmanagedNodes.Insert(nodeName) + cloud.routeCIDRs = map[string]string{} + route1 := cloudprovider.Route{ + TargetNode: mapRouteNameToNodeName(nodeName), + DestinationCIDR: nodeCIDR, + } + err = cloud.CreateRoute(context.TODO(), "cluster", "unused", &route1) + if err != nil { + t.Errorf("unexpected error creating route: %v", err) + t.FailNow() + } + cidr, found := cloud.routeCIDRs[nodeName] + if !found { + t.Errorf("unexpected missing item for %s", nodeName) + t.FailNow() + } + if cidr != nodeCIDR { + t.Errorf("unexpected cidr %s, saw %s", nodeCIDR, cidr) + } } func TestCreateRouteTableIfNotExists_Exists(t *testing.T) { diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go index bf694d84f3c..2d243a9e7a8 100644 --- a/pkg/cloudprovider/providers/azure/azure_test.go +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -958,6 +958,7 @@ func getTestCloud() (az *Cloud) { nodeInformerSynced: func() bool { return true }, nodeResourceGroups: map[string]string{}, unmanagedNodes: sets.NewString(), + routeCIDRs: map[string]string{}, } az.DisksClient = newFakeDisksClient() az.InterfacesClient = newFakeAzureInterfacesClient() diff --git a/pkg/cloudprovider/providers/azure/azure_wrap_test.go b/pkg/cloudprovider/providers/azure/azure_wrap_test.go index 5ab090c2a47..8c7ba0ee4da 100644 --- a/pkg/cloudprovider/providers/azure/azure_wrap_test.go +++ b/pkg/cloudprovider/providers/azure/azure_wrap_test.go @@ -23,6 +23,8 @@ import ( "testing" "github.com/Azure/go-autorest/autorest" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/sets" ) func TestExtractNotFound(t *testing.T) { @@ -51,3 +53,57 @@ func TestExtractNotFound(t *testing.T) { } } } + +func TestIsNodeUnmanaged(t *testing.T) { + tests := []struct { + name string + unmanagedNodes sets.String + node string + expected bool + expectErr bool + }{ + { + name: "unmanaged node should return true", + unmanagedNodes: sets.NewString("node1", "node2"), + node: "node1", + expected: true, + }, + { + name: "managed node should return false", + unmanagedNodes: sets.NewString("node1", "node2"), + node: "node3", + expected: false, + }, + { + name: "empty unmanagedNodes should return true", + unmanagedNodes: sets.NewString(), + node: "node3", + expected: false, + }, + { + name: "no synced informer should report error", + unmanagedNodes: sets.NewString(), + node: "node1", + expectErr: true, + }, + } + + az := getTestCloud() + for _, test := range tests { + az.unmanagedNodes = test.unmanagedNodes + if test.expectErr { + az.nodeInformerSynced = func() bool { + return false + } + } + + real, err := az.IsNodeUnmanaged(test.node) + if test.expectErr { + assert.Error(t, err, test.name) + continue + } + + assert.NoError(t, err, test.name) + assert.Equal(t, test.expected, real, test.name) + } +}