mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Chose availability zones from active nodes
This commit is contained in:
parent
74813d0d26
commit
811e831b0a
@ -21,13 +21,19 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/informers"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/flowcontrol"
|
"k8s.io/client-go/util/flowcontrol"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth"
|
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure/auth"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||||
"k8s.io/kubernetes/pkg/version"
|
"k8s.io/kubernetes/pkg/version"
|
||||||
|
|
||||||
"github.com/Azure/go-autorest/autorest"
|
"github.com/Azure/go-autorest/autorest"
|
||||||
@ -150,6 +156,13 @@ type Cloud struct {
|
|||||||
metadata *InstanceMetadata
|
metadata *InstanceMetadata
|
||||||
vmSet VMSet
|
vmSet VMSet
|
||||||
|
|
||||||
|
// Lock for access to nodeZones
|
||||||
|
nodeZonesLock sync.Mutex
|
||||||
|
// nodeZones is a mapping from Zone to a sets.String of Node's names in the Zone
|
||||||
|
// it is updated by the nodeInformer
|
||||||
|
nodeZones map[string]sets.String
|
||||||
|
nodeInformerSynced cache.InformerSynced
|
||||||
|
|
||||||
// Clients for vmss.
|
// Clients for vmss.
|
||||||
VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient
|
VirtualMachineScaleSetsClient VirtualMachineScaleSetsClient
|
||||||
VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient
|
VirtualMachineScaleSetVMsClient VirtualMachineScaleSetVMsClient
|
||||||
@ -243,6 +256,7 @@ func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) {
|
|||||||
az := Cloud{
|
az := Cloud{
|
||||||
Config: *config,
|
Config: *config,
|
||||||
Environment: *env,
|
Environment: *env,
|
||||||
|
nodeZones: map[string]sets.String{},
|
||||||
|
|
||||||
DisksClient: newAzDisksClient(azClientConfig),
|
DisksClient: newAzDisksClient(azClientConfig),
|
||||||
RoutesClient: newAzRoutesClient(azClientConfig),
|
RoutesClient: newAzRoutesClient(azClientConfig),
|
||||||
@ -427,3 +441,87 @@ func initDiskControllers(az *Cloud) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetInformers sets informers for Azure cloud provider.
|
||||||
|
func (az *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
|
||||||
|
glog.Infof("Setting up informers for Azure cloud provider")
|
||||||
|
nodeInformer := informerFactory.Core().V1().Nodes().Informer()
|
||||||
|
nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: func(obj interface{}) {
|
||||||
|
node := obj.(*v1.Node)
|
||||||
|
az.updateNodeZones(nil, node)
|
||||||
|
},
|
||||||
|
UpdateFunc: func(prev, obj interface{}) {
|
||||||
|
prevNode := prev.(*v1.Node)
|
||||||
|
newNode := obj.(*v1.Node)
|
||||||
|
if newNode.Labels[kubeletapis.LabelZoneFailureDomain] ==
|
||||||
|
prevNode.Labels[kubeletapis.LabelZoneFailureDomain] {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
az.updateNodeZones(prevNode, newNode)
|
||||||
|
},
|
||||||
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
node, isNode := obj.(*v1.Node)
|
||||||
|
// We can get DeletedFinalStateUnknown instead of *v1.Node here
|
||||||
|
// and we need to handle that correctly.
|
||||||
|
if !isNode {
|
||||||
|
deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Received unexpected object: %v", obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
node, ok = deletedState.Obj.(*v1.Node)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
az.updateNodeZones(node, nil)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
az.nodeInformerSynced = nodeInformer.HasSynced
|
||||||
|
}
|
||||||
|
|
||||||
|
func (az *Cloud) updateNodeZones(prevNode, newNode *v1.Node) {
|
||||||
|
az.nodeZonesLock.Lock()
|
||||||
|
defer az.nodeZonesLock.Unlock()
|
||||||
|
if prevNode != nil {
|
||||||
|
prevZone, ok := prevNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
|
||||||
|
if ok && az.isAvailabilityZone(prevZone) {
|
||||||
|
az.nodeZones[prevZone].Delete(prevNode.ObjectMeta.Name)
|
||||||
|
if az.nodeZones[prevZone].Len() == 0 {
|
||||||
|
az.nodeZones[prevZone] = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if newNode != nil {
|
||||||
|
newZone, ok := newNode.ObjectMeta.Labels[kubeletapis.LabelZoneFailureDomain]
|
||||||
|
if ok && az.isAvailabilityZone(newZone) {
|
||||||
|
if az.nodeZones[newZone] == nil {
|
||||||
|
az.nodeZones[newZone] = sets.NewString()
|
||||||
|
}
|
||||||
|
az.nodeZones[newZone].Insert(newNode.ObjectMeta.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetActiveZones returns all the zones in which k8s nodes are currently running.
|
||||||
|
func (az *Cloud) GetActiveZones() (sets.String, error) {
|
||||||
|
if az.nodeInformerSynced == nil {
|
||||||
|
return nil, fmt.Errorf("Azure cloud provider doesn't have informers set")
|
||||||
|
}
|
||||||
|
|
||||||
|
az.nodeZonesLock.Lock()
|
||||||
|
defer az.nodeZonesLock.Unlock()
|
||||||
|
if !az.nodeInformerSynced() {
|
||||||
|
return nil, fmt.Errorf("node informer is not synced when trying to GetActiveZones")
|
||||||
|
}
|
||||||
|
|
||||||
|
zones := sets.NewString()
|
||||||
|
for zone, nodes := range az.nodeZones {
|
||||||
|
if len(nodes) > 0 {
|
||||||
|
zones.Insert(zone)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return zones, nil
|
||||||
|
}
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
kwait "k8s.io/apimachinery/pkg/util/wait"
|
kwait "k8s.io/apimachinery/pkg/util/wait"
|
||||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||||
"k8s.io/kubernetes/pkg/volume"
|
"k8s.io/kubernetes/pkg/volume"
|
||||||
@ -61,26 +62,33 @@ func newManagedDiskController(common *controllerCommon) (*ManagedDiskController,
|
|||||||
|
|
||||||
//CreateManagedDisk : create managed disk
|
//CreateManagedDisk : create managed disk
|
||||||
func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (string, error) {
|
func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (string, error) {
|
||||||
|
var zones sets.String
|
||||||
|
var activeZones sets.String
|
||||||
|
var err error
|
||||||
glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB)
|
glog.V(4).Infof("azureDisk - creating new managed Name:%s StorageAccountType:%s Size:%v", options.DiskName, options.StorageAccountType, options.SizeGB)
|
||||||
|
|
||||||
|
// Get active zones which have nodes running on.
|
||||||
|
activeZones, err = c.common.cloud.GetActiveZones()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("error querying active zones: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Validate and choose availability zone for creating disk.
|
// Validate and choose availability zone for creating disk.
|
||||||
var createAZ string
|
|
||||||
if options.Zoned && !options.ZonePresent && !options.ZonesPresent {
|
if options.Zoned && !options.ZonePresent && !options.ZonesPresent {
|
||||||
// TODO: get zones from active zones that with running nodes.
|
// Neither "zone" or "zones" specified. Pick a zone randomly selected
|
||||||
}
|
// from all active zones where Kubernetes cluster has a node.
|
||||||
if !options.ZonePresent && options.ZonesPresent {
|
zones = activeZones
|
||||||
|
} else if !options.ZonePresent && options.ZonesPresent {
|
||||||
// Choose zone from specified zones.
|
// Choose zone from specified zones.
|
||||||
if adminSetOfZones, err := util.ZonesToSet(options.AvailabilityZones); err != nil {
|
if zones, err = util.ZonesToSet(options.AvailabilityZones); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
} else {
|
|
||||||
createAZ = util.ChooseZoneForVolume(adminSetOfZones, options.PVCName)
|
|
||||||
}
|
}
|
||||||
}
|
} else if options.ZonePresent && !options.ZonesPresent {
|
||||||
if options.ZonePresent && !options.ZonesPresent {
|
|
||||||
if err := util.ValidateZone(options.AvailabilityZone); err != nil {
|
if err := util.ValidateZone(options.AvailabilityZone); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
createAZ = options.AvailabilityZone
|
zones = make(sets.String)
|
||||||
|
zones.Insert(options.AvailabilityZone)
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert original tags to newTags
|
// insert original tags to newTags
|
||||||
@ -112,14 +120,21 @@ func (c *ManagedDiskController) CreateManagedDisk(options *ManagedDiskOptions) (
|
|||||||
if options.ResourceGroup == "" {
|
if options.ResourceGroup == "" {
|
||||||
options.ResourceGroup = c.common.resourceGroup
|
options.ResourceGroup = c.common.resourceGroup
|
||||||
}
|
}
|
||||||
if createAZ != "" {
|
if len(zones.List()) > 0 {
|
||||||
createZones := []string{createAZ}
|
createAZ := util.ChooseZoneForVolume(zones, options.PVCName)
|
||||||
|
// Do not allow creation of disks in zones that are do not have nodes. Such disks
|
||||||
|
// are not currently usable.
|
||||||
|
if !activeZones.Has(createAZ) {
|
||||||
|
return "", fmt.Errorf("kubernetes does not have a node in zone %q", createAZ)
|
||||||
|
}
|
||||||
|
|
||||||
|
createZones := []string{c.common.cloud.GetZoneID(createAZ)}
|
||||||
model.Zones = &createZones
|
model.Zones = &createZones
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := getContextWithCancel()
|
ctx, cancel := getContextWithCancel()
|
||||||
defer cancel()
|
defer cancel()
|
||||||
_, err := c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model)
|
_, err = c.common.cloud.DisksClient.CreateOrUpdate(ctx, options.ResourceGroup, options.DiskName, model)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user