From 811e831b0a8bd20b847bd01dbe1d3d32654aed8d Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Mon, 23 Jul 2018 17:21:31 +0800 Subject: [PATCH] Chose availability zones from active nodes --- pkg/cloudprovider/providers/azure/azure.go | 98 +++++++++++++++++++ .../azure/azure_managedDiskController.go | 41 +++++--- 2 files changed, 126 insertions(+), 13 deletions(-) diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go index 80a54f406de..04786b2d3a0 100644 --- a/pkg/cloudprovider/providers/azure/azure.go +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -21,13 +21,19 @@ import ( "io" "io/ioutil" "strings" + "sync" "time" + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth" "k8s.io/kubernetes/pkg/controller" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/version" "github.com/Azure/go-autorest/autorest" @@ -150,6 +156,13 @@ type Cloud struct { metadata *InstanceMetadata vmSet VMSet + // Lock for access to nodeZones + nodeZonesLock sync.Mutex + // nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone + // it is updated by the nodeInformer + nodeZones map[string]sets.String + nodeInformerSynced cache.InformerSynced + // Clients for vmss. VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient @@ -243,6 +256,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { az := Cloud{ Config: *config, Environment: *env, + nodeZones: map[string]sets.String{}, DisksClient: newAzDisksClient(azClientConfig), RoutesClient: newAzRoutesClient(azClientConfig), @@ -427,3 +441,87 @@ func initDiskControllers(az *Cloud) error { return nil } + +// SetInformers sets informers for Azure cloud provider. +func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { + glog.Infof("Setting up informers for Azure cloud provider") + nodeInformer := informerFactory.Core().V1().Nodes().Informer() + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + az.updateNodeZones(nil, node) + }, + UpdateFunc: func(prev, obj interface{}) { + prevNode := prev.(*v1.Node) + newNode := obj.(*v1.Node) + if newNode.Labels[kubeletapis.LabelZoneFailureDomain] == + prevNode.Labels[kubeletapis.LabelZoneFailureDomain] { + return + } + az.updateNodeZones(prevNode, newNode) + }, + DeleteFunc: func(obj interface{}) { + node, isNode := obj.(*v1.Node) + // We can get DeletedFinalStateUnknown instead of *v1.Node here + // and we need to handle that correctly. + if !isNode { + deletedState, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Received unexpected object: %v", obj) + return + } + node, ok = deletedState.Obj.(*v1.Node) + if !ok { + glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj) + return + } + } + az.updateNodeZones(node, nil) + }, + }) + az.nodeInformerSynced = nodeInformer.HasSynced +} + +func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) { + az.nodeZonesLock.Lock() + defer az.nodeZonesLock.Unlock() + if prevNode != nil { + prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] + if ok && az.isAvailabilityZone(prevZone) { + az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name) + if az.nodeZones[prevZone].Len() == 0 { + az.nodeZones[prevZone] = nil + } + } + } + if newNode != nil { + newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain] + if ok && az.isAvailabilityZone(newZone) { + if az.nodeZones[newZone] == nil { + az.nodeZones[newZone] = sets.NewString() + } + az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name) + } + } +} + +// GetActiveZones returns all the zones in which k8s nodes are currently running. +func (az *Cloud) GetActiveZones() (sets.String, error) { + if az.nodeInformerSynced == nil { + return nil, fmt.Errorf("Azure cloud provider doesn't have informers set") + } + + az.nodeZonesLock.Lock() + defer az.nodeZonesLock.Unlock() + if !az.nodeInformerSynced() { + return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones") + } + + zones := sets.NewString() + for zone, nodes := range az.nodeZones { + if len(nodes) > 0 { + zones.Insert(zone) + } + } + return zones, nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go index 8fa4515c493..fd780ffece1 100644 --- a/pkg/cloudprovider/providers/azure/azure_managedDiskController.go +++ b/pkg/cloudprovider/providers/azure/azure_managedDiskController.go @@ -29,6 +29,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" kwait "k8s.io/apimachinery/pkg/util/wait" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" "k8s.io/kubernetes/pkg/volume" @@ -61,26 +62,33 @@ func newManagedDiskController(common *controllerCommon) (*ManagedDiskController, //CreateManagedDisk : create managed disk func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (string, error) { + var zones sets.String + var activeZones sets.String + var err error glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB) + // Get active zones which have nodes running on. + activeZones, err = c.common.cloud.GetActiveZones() + if err != nil { + return "", fmt.Errorf("error querying active zones: %v", err) + } + // Validate and choose availability zone for creating disk. - var createAZ string if options.Zoned && !options.ZonePresent && !options.ZonesPresent { - // TODO: get zones from active zones that with running nodes. - } - if !options.ZonePresent && options.ZonesPresent { + // Neither "zone" or "zones" specified. Pick a zone randomly selected + // from all active zones where Kubernetes cluster has a node. + zones = activeZones + } else if !options.ZonePresent && options.ZonesPresent { // Choose zone from specified zones. - if adminSetOfZones, err := util.ZonesToSet(options.AvailabilityZones); err != nil { + if zones, err = util.ZonesToSet(options.AvailabilityZones); err != nil { return "", err - } else { - createAZ = util.ChooseZoneForVolume(adminSetOfZones, options.PVCName) } - } - if options.ZonePresent && !options.ZonesPresent { + } else if options.ZonePresent && !options.ZonesPresent { if err := util.ValidateZone(options.AvailabilityZone); err != nil { return "", err } - createAZ = options.AvailabilityZone + zones = make(sets.String) + zones.Insert(options.AvailabilityZone) } // insert original tags to newTags @@ -112,14 +120,21 @@ func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) ( if options.ResourceGroup == "" { options.ResourceGroup = c.common.resourceGroup } - if createAZ != "" { - createZones := []string{createAZ} + if len(zones.List()) > 0 { + createAZ := util.ChooseZoneForVolume(zones, options.PVCName) + // Do not allow creation of disks in zones that are do not have nodes. Such disks + // are not currently usable. + if !activeZones.Has(createAZ) { + return "", fmt.Errorf("kubernetes does not have a node in zone %q", createAZ) + } + + createZones := []string{c.common.cloud.GetZoneID(createAZ)} model.Zones = &createZones } ctx, cancel := getContextWithCancel() defer cancel() - _, err := c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model) + _, err = c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model) if err != nil { return "", err }