mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-31 05:40:42 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			809 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			809 lines
		
	
	
		
			26 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2017 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package azure
 | |
| 
 | |
| import (
 | |
| 	"bytes"
 | |
| 	"encoding/binary"
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"net/url"
 | |
| 	"os"
 | |
| 	"regexp"
 | |
| 	"sync"
 | |
| 
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	storage "github.com/Azure/azure-sdk-for-go/arm/storage"
 | |
| 	azstorage "github.com/Azure/azure-sdk-for-go/storage"
 | |
| 	"github.com/Azure/go-autorest/autorest/to"
 | |
| 	"github.com/golang/glog"
 | |
| 	"github.com/rubiojr/go-vhd/vhd"
 | |
| 	kwait "k8s.io/apimachinery/pkg/util/wait"
 | |
| 	"k8s.io/kubernetes/pkg/volume"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	vhdContainerName         = "vhds"
 | |
| 	useHTTPSForBlobBasedDisk = true
 | |
| 	blobServiceName          = "blob"
 | |
| )
 | |
| 
 | |
| type storageAccountState struct {
 | |
| 	name                    string
 | |
| 	saType                  storage.SkuName
 | |
| 	key                     string
 | |
| 	diskCount               int32
 | |
| 	isValidating            int32
 | |
| 	defaultContainerCreated bool
 | |
| }
 | |
| 
 | |
| //BlobDiskController : blob disk controller struct
 | |
| type BlobDiskController struct {
 | |
| 	common   *controllerCommon
 | |
| 	accounts map[string]*storageAccountState
 | |
| }
 | |
| 
 | |
| var defaultContainerName = ""
 | |
| var storageAccountNamePrefix = ""
 | |
| var storageAccountNameMatch = ""
 | |
| var initFlag int64
 | |
| 
 | |
| var accountsLock = &sync.Mutex{}
 | |
| 
 | |
| func newBlobDiskController(common *controllerCommon) (*BlobDiskController, error) {
 | |
| 	c := BlobDiskController{common: common}
 | |
| 	err := c.init()
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &c, nil
 | |
| }
 | |
| 
 | |
| // CreateVolume creates a VHD blob in a given storage account, will create the given storage account if it does not exist in current resource group
 | |
| func (c *BlobDiskController) CreateVolume(name, storageAccount string, storageAccountType storage.SkuName, location string, requestGB int) (string, string, int, error) {
 | |
| 	key, err := c.common.cloud.getStorageAccesskey(storageAccount)
 | |
| 	if err != nil {
 | |
| 		glog.V(2).Infof("azureDisk - no key found for storage account %s in resource group %s, begin to create a new storage account", storageAccount, c.common.resourceGroup)
 | |
| 
 | |
| 		cp := storage.AccountCreateParameters{
 | |
| 			Sku:      &storage.Sku{Name: storageAccountType},
 | |
| 			Tags:     &map[string]*string{"created-by": to.StringPtr("azure-dd")},
 | |
| 			Location: &location}
 | |
| 		cancel := make(chan struct{})
 | |
| 
 | |
| 		_, errchan := c.common.cloud.StorageAccountClient.Create(c.common.resourceGroup, storageAccount, cp, cancel)
 | |
| 		err = <-errchan
 | |
| 		if err != nil {
 | |
| 			return "", "", 0, fmt.Errorf(fmt.Sprintf("Create Storage Account %s, error: %s", storageAccount, err))
 | |
| 		}
 | |
| 
 | |
| 		key, err = c.common.cloud.getStorageAccesskey(storageAccount)
 | |
| 		if err != nil {
 | |
| 			return "", "", 0, fmt.Errorf("no key found for storage account %s even after creating a new storage account", storageAccount)
 | |
| 		}
 | |
| 
 | |
| 		glog.Errorf("no key found for storage account %s in resource group %s", storageAccount, c.common.resourceGroup)
 | |
| 		return "", "", 0, err
 | |
| 	}
 | |
| 
 | |
| 	client, err := azstorage.NewBasicClientOnSovereignCloud(storageAccount, key, c.common.cloud.Environment)
 | |
| 	if err != nil {
 | |
| 		return "", "", 0, err
 | |
| 	}
 | |
| 	blobClient := client.GetBlobService()
 | |
| 
 | |
| 	container := blobClient.GetContainerReference(vhdContainerName)
 | |
| 	_, err = container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
 | |
| 	if err != nil {
 | |
| 		return "", "", 0, err
 | |
| 	}
 | |
| 
 | |
| 	diskName, diskURI, err := c.createVHDBlobDisk(blobClient, storageAccount, name, vhdContainerName, int64(requestGB))
 | |
| 	if err != nil {
 | |
| 		return "", "", 0, err
 | |
| 	}
 | |
| 
 | |
| 	glog.V(4).Infof("azureDisk - created vhd blob uri: %s", diskURI)
 | |
| 	return diskName, diskURI, requestGB, err
 | |
| }
 | |
| 
 | |
| // DeleteVolume deletes a VHD blob
 | |
| func (c *BlobDiskController) DeleteVolume(diskURI string) error {
 | |
| 	glog.V(4).Infof("azureDisk - begin to delete volume %s", diskURI)
 | |
| 	accountName, blob, err := c.common.cloud.getBlobNameAndAccountFromURI(diskURI)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to parse vhd URI %v", err)
 | |
| 	}
 | |
| 	key, err := c.common.cloud.getStorageAccesskey(accountName)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("no key for storage account %s, err %v", accountName, err)
 | |
| 	}
 | |
| 	err = c.common.cloud.deleteVhdBlob(accountName, key, blob)
 | |
| 	if err != nil {
 | |
| 		glog.Warningf("azureDisk - failed to delete blob %s err: %v", diskURI, err)
 | |
| 		detail := err.Error()
 | |
| 		if strings.Contains(detail, errLeaseIDMissing) {
 | |
| 			// disk is still being used
 | |
| 			// see https://msdn.microsoft.com/en-us/library/microsoft.windowsazure.storage.blob.protocol.bloberrorcodestrings.leaseidmissing.aspx
 | |
| 			return volume.NewDeletedVolumeInUseError(fmt.Sprintf("disk %q is still in use while being deleted", diskURI))
 | |
| 		}
 | |
| 		return fmt.Errorf("failed to delete vhd %v, account %s, blob %s, err: %v", diskURI, accountName, blob, err)
 | |
| 	}
 | |
| 	glog.V(4).Infof("azureDisk - blob %s deleted", diskURI)
 | |
| 	return nil
 | |
| 
 | |
| }
 | |
| 
 | |
| // get diskURI https://foo.blob.core.windows.net/vhds/bar.vhd and return foo (account) and bar.vhd (blob name)
 | |
| func (c *BlobDiskController) getBlobNameAndAccountFromURI(diskURI string) (string, string, error) {
 | |
| 	scheme := "http"
 | |
| 	if useHTTPSForBlobBasedDisk {
 | |
| 		scheme = "https"
 | |
| 	}
 | |
| 	host := fmt.Sprintf("%s://(.*).%s.%s", scheme, blobServiceName, c.common.storageEndpointSuffix)
 | |
| 	reStr := fmt.Sprintf("%s/%s/(.*)", host, vhdContainerName)
 | |
| 	re := regexp.MustCompile(reStr)
 | |
| 	res := re.FindSubmatch([]byte(diskURI))
 | |
| 	if len(res) < 3 {
 | |
| 		return "", "", fmt.Errorf("invalid vhd URI for regex %s: %s", reStr, diskURI)
 | |
| 	}
 | |
| 	return string(res[1]), string(res[2]), nil
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) createVHDBlobDisk(blobClient azstorage.BlobStorageClient, accountName, vhdName, containerName string, sizeGB int64) (string, string, error) {
 | |
| 	container := blobClient.GetContainerReference(containerName)
 | |
| 	_, err := container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
 | |
| 	if err != nil {
 | |
| 		return "", "", err
 | |
| 	}
 | |
| 
 | |
| 	size := 1024 * 1024 * 1024 * sizeGB
 | |
| 	vhdSize := size + vhd.VHD_HEADER_SIZE /* header size */
 | |
| 	// Blob name in URL must end with '.vhd' extension.
 | |
| 	vhdName = vhdName + ".vhd"
 | |
| 
 | |
| 	tags := make(map[string]string)
 | |
| 	tags["createdby"] = "k8sAzureDataDisk"
 | |
| 	glog.V(4).Infof("azureDisk - creating page blob %name in container %s account %s", vhdName, containerName, accountName)
 | |
| 
 | |
| 	blob := container.GetBlobReference(vhdName)
 | |
| 	blob.Properties.ContentLength = vhdSize
 | |
| 	blob.Metadata = tags
 | |
| 	err = blob.PutPageBlob(nil)
 | |
| 	if err != nil {
 | |
| 		return "", "", fmt.Errorf("failed to put page blob %s in container %s: %v", vhdName, containerName, err)
 | |
| 	}
 | |
| 
 | |
| 	// add VHD signature to the blob
 | |
| 	h, err := createVHDHeader(uint64(size))
 | |
| 	if err != nil {
 | |
| 		blob.DeleteIfExists(nil)
 | |
| 		return "", "", fmt.Errorf("failed to create vhd header, err: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	blobRange := azstorage.BlobRange{
 | |
| 		Start: uint64(size),
 | |
| 		End:   uint64(vhdSize - 1),
 | |
| 	}
 | |
| 	if err = blob.WriteRange(blobRange, bytes.NewBuffer(h[:vhd.VHD_HEADER_SIZE]), nil); err != nil {
 | |
| 		glog.Infof("azureDisk - failed to put header page for data disk %s in container %s account %s, error was %s\n",
 | |
| 			vhdName, containerName, accountName, err.Error())
 | |
| 		return "", "", err
 | |
| 	}
 | |
| 
 | |
| 	scheme := "http"
 | |
| 	if useHTTPSForBlobBasedDisk {
 | |
| 		scheme = "https"
 | |
| 	}
 | |
| 
 | |
| 	host := fmt.Sprintf("%s://%s.%s.%s", scheme, accountName, blobServiceName, c.common.storageEndpointSuffix)
 | |
| 	uri := fmt.Sprintf("%s/%s/%s", host, containerName, vhdName)
 | |
| 	return vhdName, uri, nil
 | |
| }
 | |
| 
 | |
| // delete a vhd blob
 | |
| func (c *BlobDiskController) deleteVhdBlob(accountName, accountKey, blobName string) error {
 | |
| 	client, err := azstorage.NewBasicClientOnSovereignCloud(accountName, accountKey, c.common.cloud.Environment)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	blobSvc := client.GetBlobService()
 | |
| 
 | |
| 	container := blobSvc.GetContainerReference(vhdContainerName)
 | |
| 	blob := container.GetBlobReference(blobName)
 | |
| 	return blob.Delete(nil)
 | |
| }
 | |
| 
 | |
| //CreateBlobDisk : create a blob disk in a node
 | |
| func (c *BlobDiskController) CreateBlobDisk(dataDiskName string, storageAccountType storage.SkuName, sizeGB int, forceStandAlone bool) (string, error) {
 | |
| 	glog.V(4).Infof("azureDisk - creating blob data disk named:%s on StorageAccountType:%s StandAlone:%v", dataDiskName, storageAccountType, forceStandAlone)
 | |
| 
 | |
| 	var storageAccountName = ""
 | |
| 	var err error
 | |
| 
 | |
| 	if forceStandAlone {
 | |
| 		// we have to wait until the storage account is is created
 | |
| 		storageAccountName = "p" + MakeCRC32(c.common.subscriptionID+c.common.resourceGroup+dataDiskName)
 | |
| 		err = c.createStorageAccount(storageAccountName, storageAccountType, c.common.location, false)
 | |
| 		if err != nil {
 | |
| 			return "", err
 | |
| 		}
 | |
| 	} else {
 | |
| 		storageAccountName, err = c.findSANameForDisk(storageAccountType)
 | |
| 		if err != nil {
 | |
| 			return "", err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	blobClient, err := c.getBlobSvcClient(storageAccountName)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	_, diskURI, err := c.createVHDBlobDisk(blobClient, storageAccountName, dataDiskName, defaultContainerName, int64(sizeGB))
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 
 | |
| 	if !forceStandAlone {
 | |
| 		atomic.AddInt32(&c.accounts[storageAccountName].diskCount, 1)
 | |
| 	}
 | |
| 
 | |
| 	return diskURI, nil
 | |
| }
 | |
| 
 | |
| //DeleteBlobDisk : delete a blob disk from a node
 | |
| func (c *BlobDiskController) DeleteBlobDisk(diskURI string, wasForced bool) error {
 | |
| 	storageAccountName, vhdName, err := diskNameandSANameFromURI(diskURI)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	_, ok := c.accounts[storageAccountName]
 | |
| 	if !ok {
 | |
| 		// the storage account is specified by user
 | |
| 		glog.V(4).Infof("azureDisk - deleting volume %s", diskURI)
 | |
| 		return c.DeleteVolume(diskURI)
 | |
| 	}
 | |
| 	// if forced (as in one disk = one storage account)
 | |
| 	// delete the account completely
 | |
| 	if wasForced {
 | |
| 		return c.deleteStorageAccount(storageAccountName)
 | |
| 	}
 | |
| 
 | |
| 	blobSvc, err := c.getBlobSvcClient(storageAccountName)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	glog.V(4).Infof("azureDisk - About to delete vhd file %s on storage account %s container %s", vhdName, storageAccountName, defaultContainerName)
 | |
| 
 | |
| 	container := blobSvc.GetContainerReference(defaultContainerName)
 | |
| 	blob := container.GetBlobReference(vhdName)
 | |
| 	_, err = blob.DeleteIfExists(nil)
 | |
| 
 | |
| 	if c.accounts[storageAccountName].diskCount == -1 {
 | |
| 		if diskCount, err := c.getDiskCount(storageAccountName); err != nil {
 | |
| 			c.accounts[storageAccountName].diskCount = int32(diskCount)
 | |
| 		} else {
 | |
| 			glog.Warningf("azureDisk - failed to get disk count for %s however the delete disk operation was ok", storageAccountName)
 | |
| 			return nil // we have failed to aquire a new count. not an error condition
 | |
| 		}
 | |
| 	}
 | |
| 	atomic.AddInt32(&c.accounts[storageAccountName].diskCount, -1)
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| // Init tries best effort to ensure that 2 accounts standard/premium were created
 | |
| // to be used by shared blob disks. This to increase the speed pvc provisioning (in most of cases)
 | |
| func (c *BlobDiskController) init() error {
 | |
| 	if !c.shouldInit() {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	c.setUniqueStrings()
 | |
| 
 | |
| 	// get accounts
 | |
| 	accounts, err := c.getAllStorageAccounts()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	c.accounts = accounts
 | |
| 
 | |
| 	if len(c.accounts) == 0 {
 | |
| 		counter := 1
 | |
| 		for counter <= storageAccountsCountInit {
 | |
| 
 | |
| 			accountType := storage.PremiumLRS
 | |
| 			if n := math.Mod(float64(counter), 2); n == 0 {
 | |
| 				accountType = storage.StandardLRS
 | |
| 			}
 | |
| 
 | |
| 			// We don't really care if these calls failed
 | |
| 			// at this stage, we are trying to ensure 2 accounts (Standard/Premium)
 | |
| 			// are there ready for PVC creation
 | |
| 
 | |
| 			// if we failed here, the accounts will be created in the process
 | |
| 			// of creating PVC
 | |
| 
 | |
| 			// nor do we care if they were partially created, as the entire
 | |
| 			// account creation process is idempotent
 | |
| 			go func(thisNext int) {
 | |
| 				newAccountName := getAccountNameForNum(thisNext)
 | |
| 
 | |
| 				glog.Infof("azureDisk - BlobDiskController init process  will create new storageAccount:%s type:%s", newAccountName, accountType)
 | |
| 				err := c.createStorageAccount(newAccountName, accountType, c.common.location, true)
 | |
| 				// TODO return created and error from
 | |
| 				if err != nil {
 | |
| 					glog.Infof("azureDisk - BlobDiskController init: create account %s with error:%s", newAccountName, err.Error())
 | |
| 
 | |
| 				} else {
 | |
| 					glog.Infof("azureDisk - BlobDiskController init: created account %s", newAccountName)
 | |
| 				}
 | |
| 			}(counter)
 | |
| 			counter = counter + 1
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| //Sets unique strings to be used as accountnames && || blob containers names
 | |
| func (c *BlobDiskController) setUniqueStrings() {
 | |
| 	uniqueString := c.common.resourceGroup + c.common.location + c.common.subscriptionID
 | |
| 	hash := MakeCRC32(uniqueString)
 | |
| 	//used to generate a unqie container name used by this cluster PVC
 | |
| 	defaultContainerName = hash
 | |
| 
 | |
| 	storageAccountNamePrefix = fmt.Sprintf(storageAccountNameTemplate, hash)
 | |
| 	// Used to filter relevant accounts (accounts used by shared PVC)
 | |
| 	storageAccountNameMatch = storageAccountNamePrefix
 | |
| 	// Used as a template to create new names for relevant accounts
 | |
| 	storageAccountNamePrefix = storageAccountNamePrefix + "%s"
 | |
| }
 | |
| func (c *BlobDiskController) getStorageAccountKey(SAName string) (string, error) {
 | |
| 	if account, exists := c.accounts[SAName]; exists && account.key != "" {
 | |
| 		return c.accounts[SAName].key, nil
 | |
| 	}
 | |
| 	listKeysResult, err := c.common.cloud.StorageAccountClient.ListKeys(c.common.resourceGroup, SAName)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	if listKeysResult.Keys == nil {
 | |
| 		return "", fmt.Errorf("azureDisk - empty listKeysResult in storage account:%s keys", SAName)
 | |
| 	}
 | |
| 	for _, v := range *listKeysResult.Keys {
 | |
| 		if v.Value != nil && *v.Value == "key1" {
 | |
| 			if _, ok := c.accounts[SAName]; !ok {
 | |
| 				glog.Warningf("azureDisk - account %s was not cached while getting keys", SAName)
 | |
| 				return *v.Value, nil
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		c.accounts[SAName].key = *v.Value
 | |
| 		return c.accounts[SAName].key, nil
 | |
| 	}
 | |
| 
 | |
| 	return "", fmt.Errorf("couldn't find key named key1 in storage account:%s keys", SAName)
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) getBlobSvcClient(SAName string) (azstorage.BlobStorageClient, error) {
 | |
| 	key := ""
 | |
| 	var client azstorage.Client
 | |
| 	var blobSvc azstorage.BlobStorageClient
 | |
| 	var err error
 | |
| 	if key, err = c.getStorageAccountKey(SAName); err != nil {
 | |
| 		return blobSvc, err
 | |
| 	}
 | |
| 
 | |
| 	if client, err = azstorage.NewBasicClientOnSovereignCloud(SAName, key, c.common.cloud.Environment); err != nil {
 | |
| 		return blobSvc, err
 | |
| 	}
 | |
| 
 | |
| 	blobSvc = client.GetBlobService()
 | |
| 	return blobSvc, nil
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) ensureDefaultContainer(storageAccountName string) error {
 | |
| 	var err error
 | |
| 	var blobSvc azstorage.BlobStorageClient
 | |
| 
 | |
| 	// short circut the check via local cache
 | |
| 	// we are forgiving the fact that account may not be in cache yet
 | |
| 	if v, ok := c.accounts[storageAccountName]; ok && v.defaultContainerCreated {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// not cached, check existance and readiness
 | |
| 	bExist, provisionState, _ := c.getStorageAccountState(storageAccountName)
 | |
| 
 | |
| 	// account does not exist
 | |
| 	if !bExist {
 | |
| 		return fmt.Errorf("azureDisk - account %s does not exist while trying to create/ensure default container", storageAccountName)
 | |
| 	}
 | |
| 
 | |
| 	// account exists but not ready yet
 | |
| 	if provisionState != storage.Succeeded {
 | |
| 		// we don't want many attempts to validate the account readiness
 | |
| 		// here hence we are locking
 | |
| 		counter := 1
 | |
| 		for swapped := atomic.CompareAndSwapInt32(&c.accounts[storageAccountName].isValidating, 0, 1); swapped != true; {
 | |
| 			time.Sleep(3 * time.Second)
 | |
| 			counter = counter + 1
 | |
| 			// check if we passed the max sleep
 | |
| 			if counter >= 20 {
 | |
| 				return fmt.Errorf("azureDisk - timeout waiting to aquire lock to validate account:%s readiness", storageAccountName)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// swapped
 | |
| 		defer func() {
 | |
| 			c.accounts[storageAccountName].isValidating = 0
 | |
| 		}()
 | |
| 
 | |
| 		// short circut the check again.
 | |
| 		if v, ok := c.accounts[storageAccountName]; ok && v.defaultContainerCreated {
 | |
| 			return nil
 | |
| 		}
 | |
| 
 | |
| 		err = kwait.ExponentialBackoff(defaultBackOff, func() (bool, error) {
 | |
| 			_, provisionState, err := c.getStorageAccountState(storageAccountName)
 | |
| 
 | |
| 			if err != nil {
 | |
| 				glog.V(4).Infof("azureDisk - GetStorageAccount:%s err %s", storageAccountName, err.Error())
 | |
| 				return false, err
 | |
| 			}
 | |
| 
 | |
| 			if provisionState == storage.Succeeded {
 | |
| 				return true, nil
 | |
| 			}
 | |
| 
 | |
| 			glog.V(4).Infof("azureDisk - GetStorageAccount:%s not ready yet", storageAccountName)
 | |
| 			// leave it for next loop/sync loop
 | |
| 			return false, fmt.Errorf("azureDisk - Account %s has not been flagged Succeeded by ARM", storageAccountName)
 | |
| 		})
 | |
| 		// we have failed to ensure that account is ready for us to create
 | |
| 		// the default vhd container
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if blobSvc, err = c.getBlobSvcClient(storageAccountName); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	container := blobSvc.GetContainerReference(defaultContainerName)
 | |
| 	bCreated, err := container.CreateIfNotExists(&azstorage.CreateContainerOptions{Access: azstorage.ContainerAccessTypePrivate})
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if bCreated {
 | |
| 		glog.V(2).Infof("azureDisk - storage account:%s had no default container(%s) and it was created \n", storageAccountName, defaultContainerName)
 | |
| 	}
 | |
| 
 | |
| 	// flag so we no longer have to check on ARM
 | |
| 	c.accounts[storageAccountName].defaultContainerCreated = true
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Gets Disk counts per storage account
 | |
| func (c *BlobDiskController) getDiskCount(SAName string) (int, error) {
 | |
| 	// if we have it in cache
 | |
| 	if c.accounts[SAName].diskCount != -1 {
 | |
| 		return int(c.accounts[SAName].diskCount), nil
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 	var blobSvc azstorage.BlobStorageClient
 | |
| 
 | |
| 	if err = c.ensureDefaultContainer(SAName); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	if blobSvc, err = c.getBlobSvcClient(SAName); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	params := azstorage.ListBlobsParameters{}
 | |
| 
 | |
| 	container := blobSvc.GetContainerReference(defaultContainerName)
 | |
| 	response, err := container.ListBlobs(params)
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	glog.V(4).Infof("azure-Disk -  refreshed data count for account %s and found %v", SAName, len(response.Blobs))
 | |
| 	c.accounts[SAName].diskCount = int32(len(response.Blobs))
 | |
| 
 | |
| 	return int(c.accounts[SAName].diskCount), nil
 | |
| }
 | |
| 
 | |
| // shouldInit ensures that we only init the plugin once
 | |
| // and we only do that in the controller
 | |
| 
 | |
| func (c *BlobDiskController) shouldInit() bool {
 | |
| 	if os.Args[0] == "kube-controller-manager" || (os.Args[0] == "/hyperkube" && os.Args[1] == "controller-manager") {
 | |
| 		swapped := atomic.CompareAndSwapInt64(&initFlag, 0, 1)
 | |
| 		if swapped {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) getAllStorageAccounts() (map[string]*storageAccountState, error) {
 | |
| 	accountListResult, err := c.common.cloud.StorageAccountClient.List()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if accountListResult.Value == nil {
 | |
| 		return nil, fmt.Errorf("azureDisk - empty accountListResult")
 | |
| 	}
 | |
| 
 | |
| 	accounts := make(map[string]*storageAccountState)
 | |
| 	for _, v := range *accountListResult.Value {
 | |
| 		if strings.Index(*v.Name, storageAccountNameMatch) != 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 		if v.Name == nil || v.Sku == nil {
 | |
| 			glog.Infof("azureDisk - accountListResult Name or Sku is nil")
 | |
| 			continue
 | |
| 		}
 | |
| 		glog.Infof("azureDisk - identified account %s as part of shared PVC accounts", *v.Name)
 | |
| 
 | |
| 		sastate := &storageAccountState{
 | |
| 			name:      *v.Name,
 | |
| 			saType:    (*v.Sku).Name,
 | |
| 			diskCount: -1,
 | |
| 		}
 | |
| 
 | |
| 		accounts[*v.Name] = sastate
 | |
| 	}
 | |
| 
 | |
| 	return accounts, nil
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) createStorageAccount(storageAccountName string, storageAccountType storage.SkuName, location string, checkMaxAccounts bool) error {
 | |
| 	bExist, _, _ := c.getStorageAccountState(storageAccountName)
 | |
| 	if bExist {
 | |
| 		newAccountState := &storageAccountState{
 | |
| 			diskCount: -1,
 | |
| 			saType:    storageAccountType,
 | |
| 			name:      storageAccountName,
 | |
| 		}
 | |
| 
 | |
| 		c.addAccountState(storageAccountName, newAccountState)
 | |
| 	}
 | |
| 	// Account Does not exist
 | |
| 	if !bExist {
 | |
| 		if len(c.accounts) == maxStorageAccounts && checkMaxAccounts {
 | |
| 			return fmt.Errorf("azureDisk - can not create new storage account, current storage accounts count:%v Max is:%v", len(c.accounts), maxStorageAccounts)
 | |
| 		}
 | |
| 
 | |
| 		glog.V(2).Infof("azureDisk - Creating storage account %s type %s \n", storageAccountName, string(storageAccountType))
 | |
| 
 | |
| 		cp := storage.AccountCreateParameters{
 | |
| 			Sku:      &storage.Sku{Name: storageAccountType},
 | |
| 			Tags:     &map[string]*string{"created-by": to.StringPtr("azure-dd")},
 | |
| 			Location: &location}
 | |
| 		cancel := make(chan struct{})
 | |
| 
 | |
| 		_, errChan := c.common.cloud.StorageAccountClient.Create(c.common.resourceGroup, storageAccountName, cp, cancel)
 | |
| 		err := <-errChan
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf(fmt.Sprintf("Create Storage Account: %s, error: %s", storageAccountName, err))
 | |
| 		}
 | |
| 
 | |
| 		newAccountState := &storageAccountState{
 | |
| 			diskCount: -1,
 | |
| 			saType:    storageAccountType,
 | |
| 			name:      storageAccountName,
 | |
| 		}
 | |
| 
 | |
| 		c.addAccountState(storageAccountName, newAccountState)
 | |
| 	}
 | |
| 
 | |
| 	if !bExist {
 | |
| 		// SA Accounts takes time to be provisioned
 | |
| 		// so if this account was just created allow it sometime
 | |
| 		// before polling
 | |
| 		glog.V(2).Infof("azureDisk - storage account %s was just created, allowing time before polling status")
 | |
| 		time.Sleep(25 * time.Second) // as observed 25 is the average time for SA to be provisioned
 | |
| 	}
 | |
| 
 | |
| 	// finally, make sure that we default container is created
 | |
| 	// before handing it back over
 | |
| 	return c.ensureDefaultContainer(storageAccountName)
 | |
| }
 | |
| 
 | |
| // finds a new suitable storageAccount for this disk
 | |
| func (c *BlobDiskController) findSANameForDisk(storageAccountType storage.SkuName) (string, error) {
 | |
| 	maxDiskCount := maxDisksPerStorageAccounts
 | |
| 	SAName := ""
 | |
| 	totalDiskCounts := 0
 | |
| 	countAccounts := 0 // account of this type.
 | |
| 	for _, v := range c.accounts {
 | |
| 		// filter out any stand-alone disks/accounts
 | |
| 		if strings.Index(v.name, storageAccountNameMatch) != 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		// note: we compute avge stratified by type.
 | |
| 		// this to enable user to grow per SA type to avoid low
 | |
| 		//avg utilization on one account type skewing all data.
 | |
| 
 | |
| 		if v.saType == storageAccountType {
 | |
| 			// compute average
 | |
| 			dCount, err := c.getDiskCount(v.name)
 | |
| 			if err != nil {
 | |
| 				return "", err
 | |
| 			}
 | |
| 			totalDiskCounts = totalDiskCounts + dCount
 | |
| 			countAccounts = countAccounts + 1
 | |
| 			// empty account
 | |
| 			if dCount == 0 {
 | |
| 				glog.V(2).Infof("azureDisk - account %s identified for a new disk  is because it has 0 allocated disks", v.name)
 | |
| 				return v.name, nil // shortcircut, avg is good and no need to adjust
 | |
| 			}
 | |
| 			// if this account is less allocated
 | |
| 			if dCount < maxDiskCount {
 | |
| 				maxDiskCount = dCount
 | |
| 				SAName = v.name
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// if we failed to find storageaccount
 | |
| 	if SAName == "" {
 | |
| 		glog.V(2).Infof("azureDisk - failed to identify a suitable account for new disk and will attempt to create new account")
 | |
| 		SAName = getAccountNameForNum(c.getNextAccountNum())
 | |
| 		err := c.createStorageAccount(SAName, storageAccountType, c.common.location, true)
 | |
| 		if err != nil {
 | |
| 			return "", err
 | |
| 		}
 | |
| 		return SAName, nil
 | |
| 	}
 | |
| 
 | |
| 	disksAfter := totalDiskCounts + 1 // with the new one!
 | |
| 
 | |
| 	avgUtilization := float64(disksAfter) / float64(countAccounts*maxDisksPerStorageAccounts)
 | |
| 	aboveAvg := (avgUtilization > storageAccountUtilizationBeforeGrowing)
 | |
| 
 | |
| 	// avg are not create and we should craete more accounts if we can
 | |
| 	if aboveAvg && countAccounts < maxStorageAccounts {
 | |
| 		glog.V(2).Infof("azureDisk - shared storageAccounts utilzation(%v) >  grow-at-avg-utilization (%v). New storage account will be created", avgUtilization, storageAccountUtilizationBeforeGrowing)
 | |
| 		SAName = getAccountNameForNum(c.getNextAccountNum())
 | |
| 		err := c.createStorageAccount(SAName, storageAccountType, c.common.location, true)
 | |
| 		if err != nil {
 | |
| 			return "", err
 | |
| 		}
 | |
| 		return SAName, nil
 | |
| 	}
 | |
| 
 | |
| 	// avergates are not ok and we are at capacity(max storage accounts allowed)
 | |
| 	if aboveAvg && countAccounts == maxStorageAccounts {
 | |
| 		glog.Infof("azureDisk - shared storageAccounts utilzation(%v) > grow-at-avg-utilization (%v). But k8s maxed on SAs for PVC(%v). k8s will now exceed grow-at-avg-utilization without adding accounts",
 | |
| 			avgUtilization, storageAccountUtilizationBeforeGrowing, maxStorageAccounts)
 | |
| 	}
 | |
| 
 | |
| 	// we found a  storage accounts && [ avg are ok || we reached max sa count ]
 | |
| 	return SAName, nil
 | |
| }
 | |
| func (c *BlobDiskController) getNextAccountNum() int {
 | |
| 	max := 0
 | |
| 
 | |
| 	for k := range c.accounts {
 | |
| 		// filter out accounts that are for standalone
 | |
| 		if strings.Index(k, storageAccountNameMatch) != 0 {
 | |
| 			continue
 | |
| 		}
 | |
| 		num := getAccountNumFromName(k)
 | |
| 		if num > max {
 | |
| 			max = num
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return max + 1
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) deleteStorageAccount(storageAccountName string) error {
 | |
| 	resp, err := c.common.cloud.StorageAccountClient.Delete(c.common.resourceGroup, storageAccountName)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("azureDisk - Delete of storage account '%s' failed with status %s...%v", storageAccountName, resp.Status, err)
 | |
| 	}
 | |
| 
 | |
| 	c.removeAccountState(storageAccountName)
 | |
| 
 | |
| 	glog.Infof("azureDisk - Storage Account %s was deleted", storageAccountName)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| //Gets storage account exist, provisionStatus, Error if any
 | |
| func (c *BlobDiskController) getStorageAccountState(storageAccountName string) (bool, storage.ProvisioningState, error) {
 | |
| 	account, err := c.common.cloud.StorageAccountClient.GetProperties(c.common.resourceGroup, storageAccountName)
 | |
| 	if err != nil {
 | |
| 		return false, "", err
 | |
| 	}
 | |
| 	return true, account.AccountProperties.ProvisioningState, nil
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) addAccountState(key string, state *storageAccountState) {
 | |
| 	accountsLock.Lock()
 | |
| 	defer accountsLock.Unlock()
 | |
| 
 | |
| 	if _, ok := c.accounts[key]; !ok {
 | |
| 		c.accounts[key] = state
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *BlobDiskController) removeAccountState(key string) {
 | |
| 	accountsLock.Lock()
 | |
| 	defer accountsLock.Unlock()
 | |
| 	delete(c.accounts, key)
 | |
| }
 | |
| 
 | |
| // pads account num with zeros as needed
 | |
| func getAccountNameForNum(num int) string {
 | |
| 	sNum := strconv.Itoa(num)
 | |
| 	missingZeros := 3 - len(sNum)
 | |
| 	strZero := ""
 | |
| 	for missingZeros > 0 {
 | |
| 		strZero = strZero + "0"
 | |
| 		missingZeros = missingZeros - 1
 | |
| 	}
 | |
| 
 | |
| 	sNum = strZero + sNum
 | |
| 	return fmt.Sprintf(storageAccountNamePrefix, sNum)
 | |
| }
 | |
| 
 | |
| func getAccountNumFromName(accountName string) int {
 | |
| 	nameLen := len(accountName)
 | |
| 	num, _ := strconv.Atoi(accountName[nameLen-3:])
 | |
| 
 | |
| 	return num
 | |
| }
 | |
| 
 | |
| func createVHDHeader(size uint64) ([]byte, error) {
 | |
| 	h := vhd.CreateFixedHeader(size, &vhd.VHDOptions{})
 | |
| 	b := new(bytes.Buffer)
 | |
| 	err := binary.Write(b, binary.BigEndian, h)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return b.Bytes(), nil
 | |
| }
 | |
| 
 | |
| func diskNameandSANameFromURI(diskURI string) (string, string, error) {
 | |
| 	uri, err := url.Parse(diskURI)
 | |
| 	if err != nil {
 | |
| 		return "", "", err
 | |
| 	}
 | |
| 
 | |
| 	hostName := uri.Host
 | |
| 	storageAccountName := strings.Split(hostName, ".")[0]
 | |
| 
 | |
| 	segments := strings.Split(uri.Path, "/")
 | |
| 	diskNameVhd := segments[len(segments)-1]
 | |
| 
 | |
| 	return storageAccountName, diskNameVhd, nil
 | |
| }
 |