diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index 4a3b2a48a4e..cb5b6818917 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -81,12 +81,15 @@ type LoadBalancer interface { // GetLoadBalancer returns whether the specified load balancer exists, and // if so, what its status is. // Implementations must treat the *api.Service parameter as read-only and not modify it. + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager GetLoadBalancer(clusterName string, service *api.Service) (status *api.LoadBalancerStatus, exists bool, err error) // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer // Implementations must treat the *api.Service parameter as read-only and not modify it. + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager EnsureLoadBalancer(clusterName string, service *api.Service, hosts []string) (*api.LoadBalancerStatus, error) // UpdateLoadBalancer updates hosts under the specified load balancer. // Implementations must treat the *api.Service parameter as read-only and not modify it. + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager UpdateLoadBalancer(clusterName string, service *api.Service, hosts []string) error // EnsureLoadBalancerDeleted deletes the specified load balancer if it // exists, returning nil if the load balancer specified either didn't exist or @@ -95,6 +98,7 @@ type LoadBalancer interface { // have multiple underlying components, meaning a Get could say that the LB // doesn't exist even if some part of it is still laying around. // Implementations must treat the *api.Service parameter as read-only and not modify it. + // Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager EnsureLoadBalancerDeleted(clusterName string, service *api.Service) error } diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index b56d019e5fb..c7a52ba7a5a 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -37,6 +37,7 @@ import ( ) const TestClusterId = "clusterid.test" +const TestClusterName = "testCluster" func TestReadAWSCloudConfig(t *testing.T) { tests := []struct { @@ -1182,7 +1183,7 @@ func TestDescribeLoadBalancerOnDelete(t *testing.T) { c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) awsServices.elb.expectDescribeLoadBalancers("aid") - c.EnsureLoadBalancerDeleted(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}) + c.EnsureLoadBalancerDeleted(TestClusterName, &api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}) } func TestDescribeLoadBalancerOnUpdate(t *testing.T) { @@ -1190,7 +1191,7 @@ func TestDescribeLoadBalancerOnUpdate(t *testing.T) { c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) awsServices.elb.expectDescribeLoadBalancers("aid") - c.UpdateLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}) + c.UpdateLoadBalancer(TestClusterName, &api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}) } func TestDescribeLoadBalancerOnGet(t *testing.T) { @@ -1198,7 +1199,7 @@ func TestDescribeLoadBalancerOnGet(t *testing.T) { c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) awsServices.elb.expectDescribeLoadBalancers("aid") - c.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}) + c.GetLoadBalancer(TestClusterName, &api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}) } func TestDescribeLoadBalancerOnEnsure(t *testing.T) { @@ -1206,7 +1207,7 @@ func TestDescribeLoadBalancerOnEnsure(t *testing.T) { c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices) awsServices.elb.expectDescribeLoadBalancers("aid") - c.EnsureLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}) + c.EnsureLoadBalancer(TestClusterName, &api.Service{ObjectMeta: api.ObjectMeta{Name: "myservice", UID: "id"}}, []string{}) } func TestBuildListener(t *testing.T) { diff --git a/pkg/cloudprovider/providers/azure/OWNERS b/pkg/cloudprovider/providers/azure/OWNERS new file mode 100644 index 00000000000..120b921b3cf --- /dev/null +++ b/pkg/cloudprovider/providers/azure/OWNERS @@ -0,0 +1,3 @@ +assignees: + - colemickens + - brendandburns diff --git a/pkg/cloudprovider/providers/azure/azure.go b/pkg/cloudprovider/providers/azure/azure.go new file mode 100644 index 00000000000..8fefc76a652 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure.go @@ -0,0 +1,173 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "io" + "io/ioutil" + + "k8s.io/kubernetes/pkg/cloudprovider" + + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest/azure" + "github.com/ghodss/yaml" +) + +// CloudProviderName is the value used for the --cloud-provider flag +const CloudProviderName = "azure" + +// Config holds the configuration parsed from the --cloud-config flag +type Config struct { + Cloud string `json:"cloud" yaml:"cloud"` + TenantID string `json:"tenantId" yaml:"tenantId"` + SubscriptionID string `json:"subscriptionId" yaml:"subscriptionId"` + ResourceGroup string `json:"resourceGroup" yaml:"resourceGroup"` + Location string `json:"location" yaml:"location"` + VnetName string `json:"vnetName" yaml:"vnetName"` + SubnetName string `json:"subnetName" yaml:"subnetName"` + SecurityGroupName string `json:"securityGroupName" yaml:"securityGroupName"` + RouteTableName string `json:"routeTableName" yaml:"routeTableName"` + + AADClientID string `json:"aadClientId" yaml:"aadClientId"` + AADClientSecret string `json:"aadClientSecret" yaml:"aadClientSecret"` + AADTenantID string `json:"aadTenantId" yaml:"aadTenantId"` +} + +// Cloud holds the config and clients +type Cloud struct { + Config + Environment azure.Environment + RoutesClient network.RoutesClient + SubnetsClient network.SubnetsClient + InterfacesClient network.InterfacesClient + RouteTablesClient network.RouteTablesClient + LoadBalancerClient network.LoadBalancersClient + PublicIPAddressesClient network.PublicIPAddressesClient + SecurityGroupsClient network.SecurityGroupsClient + VirtualMachinesClient compute.VirtualMachinesClient +} + +func init() { + cloudprovider.RegisterCloudProvider(CloudProviderName, NewCloud) +} + +// NewCloud returns a Cloud with initialized clients +func NewCloud(configReader io.Reader) (cloudprovider.Interface, error) { + var az Cloud + + configContents, err := ioutil.ReadAll(configReader) + if err != nil { + return nil, err + } + err = yaml.Unmarshal(configContents, &az) + if err != nil { + return nil, err + } + + if az.Cloud == "" { + az.Environment = azure.PublicCloud + } else { + az.Environment, err = azure.EnvironmentFromName(az.Cloud) + if err != nil { + return nil, err + } + } + + oauthConfig, err := az.Environment.OAuthConfigForTenant(az.TenantID) + if err != nil { + return nil, err + } + + servicePrincipalToken, err := azure.NewServicePrincipalToken( + *oauthConfig, + az.AADClientID, + az.AADClientSecret, + az.Environment.ServiceManagementEndpoint) + if err != nil { + return nil, err + } + + az.SubnetsClient = network.NewSubnetsClient(az.SubscriptionID) + az.SubnetsClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.SubnetsClient.Authorizer = servicePrincipalToken + + az.RouteTablesClient = network.NewRouteTablesClient(az.SubscriptionID) + az.RouteTablesClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.RouteTablesClient.Authorizer = servicePrincipalToken + + az.RoutesClient = network.NewRoutesClient(az.SubscriptionID) + az.RoutesClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.RoutesClient.Authorizer = servicePrincipalToken + + az.InterfacesClient = network.NewInterfacesClient(az.SubscriptionID) + az.InterfacesClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.InterfacesClient.Authorizer = servicePrincipalToken + + az.LoadBalancerClient = network.NewLoadBalancersClient(az.SubscriptionID) + az.LoadBalancerClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.LoadBalancerClient.Authorizer = servicePrincipalToken + + az.VirtualMachinesClient = compute.NewVirtualMachinesClient(az.SubscriptionID) + az.VirtualMachinesClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.VirtualMachinesClient.Authorizer = servicePrincipalToken + + az.PublicIPAddressesClient = network.NewPublicIPAddressesClient(az.SubscriptionID) + az.PublicIPAddressesClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.PublicIPAddressesClient.Authorizer = servicePrincipalToken + + az.SecurityGroupsClient = network.NewSecurityGroupsClient(az.SubscriptionID) + az.SecurityGroupsClient.BaseURI = az.Environment.ResourceManagerEndpoint + az.SecurityGroupsClient.Authorizer = servicePrincipalToken + + return &az, nil +} + +// LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise. +func (az *Cloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) { + return az, true +} + +// Instances returns an instances interface. Also returns true if the interface is supported, false otherwise. +func (az *Cloud) Instances() (cloudprovider.Instances, bool) { + return az, true +} + +// Zones returns a zones interface. Also returns true if the interface is supported, false otherwise. +func (az *Cloud) Zones() (cloudprovider.Zones, bool) { + return az, true +} + +// Clusters returns a clusters interface. Also returns true if the interface is supported, false otherwise. +func (az *Cloud) Clusters() (cloudprovider.Clusters, bool) { + return nil, false +} + +// Routes returns a routes interface along with whether the interface is supported. +func (az *Cloud) Routes() (cloudprovider.Routes, bool) { + return az, true +} + +// ScrubDNS provides an opportunity for cloud-provider-specific code to process DNS settings for pods. +func (az *Cloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []string) { + return nameservers, searches +} + +// ProviderName returns the cloud provider ID. +func (az *Cloud) ProviderName() string { + return CloudProviderName +} diff --git a/pkg/cloudprovider/providers/azure/azure_instances.go b/pkg/cloudprovider/providers/azure/azure_instances.go new file mode 100644 index 00000000000..b80adb00c34 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_instances.go @@ -0,0 +1,146 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "regexp" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/cloudprovider" + + "github.com/Azure/azure-sdk-for-go/arm/compute" +) + +// NodeAddresses returns the addresses of the specified instance. +func (az *Cloud) NodeAddresses(name string) ([]api.NodeAddress, error) { + ip, err := az.getIPForMachine(name) + if err != nil { + return nil, err + } + + return []api.NodeAddress{ + {Type: api.NodeInternalIP, Address: ip}, + {Type: api.NodeHostName, Address: name}, + }, nil +} + +// ExternalID returns the cloud provider ID of the specified instance (deprecated). +func (az *Cloud) ExternalID(name string) (string, error) { + return az.InstanceID(name) +} + +// InstanceID returns the cloud provider ID of the specified instance. +// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound) +func (az *Cloud) InstanceID(name string) (string, error) { + machine, exists, err := az.getVirtualMachine(name) + if err != nil { + return "", err + } else if !exists { + return "", cloudprovider.InstanceNotFound + } + return *machine.ID, nil +} + +// InstanceType returns the type of the specified instance. +// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound) +// (Implementer Note): This is used by kubelet. Kubelet will label the node. Real log from kubelet: +// Adding node label from cloud provider: beta.kubernetes.io/instance-type=[value] +func (az *Cloud) InstanceType(name string) (string, error) { + machine, exists, err := az.getVirtualMachine(name) + if err != nil { + return "", err + } else if !exists { + return "", cloudprovider.InstanceNotFound + } + return string(machine.Properties.HardwareProfile.VMSize), nil +} + +// List lists instances that match 'filter' which is a regular expression which must match the entire instance name (fqdn) +func (az *Cloud) List(filter string) ([]string, error) { + allNodes, err := az.listAllNodesInResourceGroup() + if err != nil { + return nil, err + } + + filteredNodes, err := filterNodes(allNodes, filter) + if err != nil { + return nil, err + } + + nodeNames := make([]string, len(filteredNodes)) + for i, v := range filteredNodes { + nodeNames[i] = *v.Name + } + + return nodeNames, nil +} + +// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances +// expected format for the key is standard ssh-keygen format: +func (az *Cloud) AddSSHKeyToAllInstances(user string, keyData []byte) error { + return fmt.Errorf("not supported") +} + +// CurrentNodeName returns the name of the node we are currently running on +// On most clouds (e.g. GCE) this is the hostname, so we provide the hostname +func (az *Cloud) CurrentNodeName(hostname string) (string, error) { + return hostname, nil +} + +func (az *Cloud) listAllNodesInResourceGroup() ([]compute.VirtualMachine, error) { + allNodes := []compute.VirtualMachine{} + + result, err := az.VirtualMachinesClient.List(az.ResourceGroup) + if err != nil { + return nil, err + } + + morePages := (result.Value != nil && len(*result.Value) > 1) + + for morePages { + allNodes = append(allNodes, *result.Value...) + + result, err = az.VirtualMachinesClient.ListAllNextResults(result) + if err != nil { + return nil, err + } + + morePages = (result.Value != nil && len(*result.Value) > 1) + } + + return allNodes, nil + +} + +func filterNodes(nodes []compute.VirtualMachine, filter string) ([]compute.VirtualMachine, error) { + filteredNodes := []compute.VirtualMachine{} + + re, err := regexp.Compile(filter) + if err != nil { + return nil, err + } + + for _, node := range nodes { + // search tags + if re.MatchString(*node.Name) { + filteredNodes = append(filteredNodes, node) + } + } + + return filteredNodes, nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_loadbalancer.go b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go new file mode 100644 index 00000000000..3d04c5f003c --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_loadbalancer.go @@ -0,0 +1,611 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "strconv" + "strings" + + "k8s.io/kubernetes/pkg/api" + utilerrors "k8s.io/kubernetes/pkg/util/errors" + + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest/to" + "github.com/golang/glog" +) + +// GetLoadBalancer returns whether the specified load balancer exists, and +// if so, what its status is. +func (az *Cloud) GetLoadBalancer(clusterName string, service *api.Service) (status *api.LoadBalancerStatus, exists bool, err error) { + lbName := getLoadBalancerName(clusterName) + pipName := getPublicIPName(clusterName, service) + serviceName := getServiceName(service) + + _, existsLb, err := az.getAzureLoadBalancer(lbName) + if err != nil { + return nil, false, err + } + if !existsLb { + glog.V(5).Infof("get(%s): lb(%s) - doesn't exist", serviceName, pipName) + return nil, false, nil + } + + pip, existsPip, err := az.getPublicIPAddress(pipName) + if err != nil { + return nil, false, err + } + if !existsPip { + glog.V(5).Infof("get(%s): pip(%s) - doesn't exist", serviceName, pipName) + return nil, false, nil + } + + return &api.LoadBalancerStatus{ + Ingress: []api.LoadBalancerIngress{{IP: *pip.Properties.IPAddress}}, + }, true, nil +} + +// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer +func (az *Cloud) EnsureLoadBalancer(clusterName string, service *api.Service, hosts []string) (*api.LoadBalancerStatus, error) { + lbName := getLoadBalancerName(clusterName) + pipName := getPublicIPName(clusterName, service) + serviceName := getServiceName(service) + glog.V(2).Infof("ensure(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName) + + pip, err := az.ensurePublicIPExists(serviceName, pipName) + if err != nil { + return nil, err + } + + sg, err := az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "") + if err != nil { + return nil, err + } + sg, sgNeedsUpdate, err := az.reconcileSecurityGroup(sg, clusterName, service) + if err != nil { + return nil, err + } + if sgNeedsUpdate { + glog.V(3).Infof("ensure(%s): sg(%s) - updating", serviceName, *sg.Name) + _, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *sg.Name, sg, nil) + if err != nil { + return nil, err + } + } + + lb, existsLb, err := az.getAzureLoadBalancer(lbName) + if err != nil { + return nil, err + } + if !existsLb { + lb = network.LoadBalancer{ + Name: &lbName, + Location: &az.Location, + Properties: &network.LoadBalancerPropertiesFormat{}, + } + } + + lb, lbNeedsUpdate, err := az.reconcileLoadBalancer(lb, pip, clusterName, service, hosts) + if err != nil { + return nil, err + } + if !existsLb || lbNeedsUpdate { + glog.V(3).Infof("ensure(%s): lb(%s) - updating", serviceName, lbName) + _, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + if err != nil { + return nil, err + } + } + + // Add the machines to the backend pool if they're not already + lbBackendName := getBackendPoolName(clusterName) + lbBackendPoolID := az.getBackendPoolID(lbName, lbBackendName) + hostUpdates := make([]func() error, len(hosts)) + for i, host := range hosts { + localHost := host + f := func() error { + err := az.ensureHostInPool(serviceName, localHost, lbBackendPoolID) + if err != nil { + return fmt.Errorf("ensure(%s): lb(%s) - failed to ensure host in pool: %q", serviceName, lbName, err) + } + return nil + } + hostUpdates[i] = f + } + + errs := utilerrors.AggregateGoroutines(hostUpdates...) + if errs != nil { + return nil, utilerrors.Flatten(errs) + } + + glog.V(2).Infof("ensure(%s): FINISH - %s", service.Name, *pip.Properties.IPAddress) + return &api.LoadBalancerStatus{ + Ingress: []api.LoadBalancerIngress{{IP: *pip.Properties.IPAddress}}, + }, nil +} + +// UpdateLoadBalancer updates hosts under the specified load balancer. +func (az *Cloud) UpdateLoadBalancer(clusterName string, service *api.Service, hosts []string) error { + _, err := az.EnsureLoadBalancer(clusterName, service, hosts) + return err +} + +// EnsureLoadBalancerDeleted deletes the specified load balancer if it +// exists, returning nil if the load balancer specified either didn't exist or +// was successfully deleted. +// This construction is useful because many cloud providers' load balancers +// have multiple underlying components, meaning a Get could say that the LB +// doesn't exist even if some part of it is still laying around. +func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *api.Service) error { + lbName := getLoadBalancerName(clusterName) + pipName := getPublicIPName(clusterName, service) + serviceName := getServiceName(service) + + glog.V(2).Infof("delete(%s): START clusterName=%q lbName=%q", serviceName, clusterName, lbName) + + // reconcile logic is capable of fully reconcile, so we can use this to delete + service.Spec.Ports = []api.ServicePort{} + + lb, existsLb, err := az.getAzureLoadBalancer(lbName) + if err != nil { + return err + } + if existsLb { + lb, lbNeedsUpdate, reconcileErr := az.reconcileLoadBalancer(lb, nil, clusterName, service, []string{}) + if reconcileErr != nil { + return reconcileErr + } + if lbNeedsUpdate { + if len(*lb.Properties.FrontendIPConfigurations) > 0 { + glog.V(3).Infof("delete(%s): lb(%s) - updating", serviceName, lbName) + _, err = az.LoadBalancerClient.CreateOrUpdate(az.ResourceGroup, *lb.Name, lb, nil) + if err != nil { + return err + } + } else { + glog.V(3).Infof("delete(%s): lb(%s) - deleting; no remaining frontendipconfigs", serviceName, lbName) + + _, err = az.LoadBalancerClient.Delete(az.ResourceGroup, lbName, nil) + if err != nil { + return err + } + } + } + } + + sg, existsSg, err := az.getSecurityGroup() + if err != nil { + return err + } + if existsSg { + reconciledSg, sgNeedsUpdate, reconcileErr := az.reconcileSecurityGroup(sg, clusterName, service) + if reconcileErr != nil { + return reconcileErr + } + if sgNeedsUpdate { + glog.V(3).Infof("delete(%s): sg(%s) - updating", serviceName, az.SecurityGroupName) + _, err := az.SecurityGroupsClient.CreateOrUpdate(az.ResourceGroup, *reconciledSg.Name, reconciledSg, nil) + if err != nil { + return err + } + } + } + + err = az.ensurePublicIPDeleted(serviceName, pipName) + if err != nil { + return err + } + + glog.V(2).Infof("delete(%s): FINISH", serviceName) + return nil +} + +func (az *Cloud) ensurePublicIPExists(serviceName, pipName string) (*network.PublicIPAddress, error) { + pip, existsPip, err := az.getPublicIPAddress(pipName) + if err != nil { + return nil, err + } + if existsPip { + return &pip, nil + } + + pip.Name = to.StringPtr(pipName) + pip.Location = to.StringPtr(az.Location) + pip.Properties = &network.PublicIPAddressPropertiesFormat{ + PublicIPAllocationMethod: network.Static, + } + pip.Tags = &map[string]*string{"service": &serviceName} + + glog.V(3).Infof("ensure(%s): pip(%s) - creating", serviceName, *pip.Name) + _, err = az.PublicIPAddressesClient.CreateOrUpdate(az.ResourceGroup, *pip.Name, pip, nil) + if err != nil { + return nil, err + } + + pip, err = az.PublicIPAddressesClient.Get(az.ResourceGroup, *pip.Name, "") + if err != nil { + return nil, err + } + + return &pip, nil + +} + +func (az *Cloud) ensurePublicIPDeleted(serviceName, pipName string) error { + _, deleteErr := az.PublicIPAddressesClient.Delete(az.ResourceGroup, pipName, nil) + _, realErr := checkResourceExistsFromError(deleteErr) + if realErr != nil { + return nil + } + return nil +} + +// This ensures load balancer exists and the frontend ip config is setup. +// This also reconciles the Service's Ports with the LoadBalancer config. +// This entails adding rules/probes for expected Ports and removing stale rules/ports. +func (az *Cloud) reconcileLoadBalancer(lb network.LoadBalancer, pip *network.PublicIPAddress, clusterName string, service *api.Service, hosts []string) (network.LoadBalancer, bool, error) { + lbName := getLoadBalancerName(clusterName) + serviceName := getServiceName(service) + lbFrontendIPConfigName := getFrontendIPConfigName(service) + lbFrontendIPConfigID := az.getFrontendIPConfigID(lbName, lbFrontendIPConfigName) + lbBackendPoolName := getBackendPoolName(clusterName) + lbBackendPoolID := az.getBackendPoolID(lbName, lbBackendPoolName) + + wantLb := len(service.Spec.Ports) > 0 + dirtyLb := false + + // Ensure LoadBalancer's Backend Pool Configuration + if wantLb { + if lb.Properties.BackendAddressPools == nil || + len(*lb.Properties.BackendAddressPools) == 0 { + lb.Properties.BackendAddressPools = &[]network.BackendAddressPool{ + { + Name: to.StringPtr(lbBackendPoolName), + }, + } + glog.V(10).Infof("reconcile(%s)(%t): lb backendpool - adding", serviceName, wantLb) + dirtyLb = true + } else if len(*lb.Properties.BackendAddressPools) != 1 || + !strings.EqualFold(*(*lb.Properties.BackendAddressPools)[0].Name, lbBackendPoolName) { + return lb, false, fmt.Errorf("loadbalancer is misconfigured with a different backend pool") + } + } + + // Ensure LoadBalancer's Frontend IP Configurations + dirtyConfigs := false + newConfigs := []network.FrontendIPConfiguration{} + if lb.Properties.FrontendIPConfigurations != nil { + newConfigs = *lb.Properties.FrontendIPConfigurations + } + if !wantLb { + for i := len(newConfigs) - 1; i >= 0; i-- { + config := newConfigs[i] + if strings.EqualFold(*config.Name, lbFrontendIPConfigName) { + glog.V(3).Infof("reconcile(%s)(%t): lb frontendconfig(%s) - dropping", serviceName, wantLb, lbFrontendIPConfigName) + newConfigs = append(newConfigs[:i], newConfigs[i+1:]...) + dirtyConfigs = true + } + } + } else { + foundConfig := false + for _, config := range newConfigs { + if strings.EqualFold(*config.Name, lbFrontendIPConfigName) { + foundConfig = true + break + } + } + if !foundConfig { + newConfigs = append(newConfigs, + network.FrontendIPConfiguration{ + Name: to.StringPtr(lbFrontendIPConfigName), + Properties: &network.FrontendIPConfigurationPropertiesFormat{ + PublicIPAddress: &network.PublicIPAddress{ + ID: pip.ID, + }, + }, + }) + glog.V(10).Infof("reconcile(%s)(%t): lb frontendconfig(%s) - adding", serviceName, wantLb, lbFrontendIPConfigName) + dirtyConfigs = true + } + } + if dirtyConfigs { + dirtyLb = true + lb.Properties.FrontendIPConfigurations = &newConfigs + } + + // update probes/rules + expectedProbes := make([]network.Probe, len(service.Spec.Ports)) + expectedRules := make([]network.LoadBalancingRule, len(service.Spec.Ports)) + for i, port := range service.Spec.Ports { + lbRuleName := getRuleName(service, port) + + transportProto, _, probeProto, err := getProtocolsFromKubernetesProtocol(port.Protocol) + if err != nil { + return lb, false, err + } + + expectedProbes[i] = network.Probe{ + Name: &lbRuleName, + Properties: &network.ProbePropertiesFormat{ + Protocol: probeProto, + Port: to.Int32Ptr(port.NodePort), + IntervalInSeconds: to.Int32Ptr(5), + NumberOfProbes: to.Int32Ptr(2), + }, + } + + expectedRules[i] = network.LoadBalancingRule{ + Name: &lbRuleName, + Properties: &network.LoadBalancingRulePropertiesFormat{ + Protocol: transportProto, + FrontendIPConfiguration: &network.SubResource{ + ID: to.StringPtr(lbFrontendIPConfigID), + }, + BackendAddressPool: &network.SubResource{ + ID: to.StringPtr(lbBackendPoolID), + }, + Probe: &network.SubResource{ + ID: to.StringPtr(az.getLoadBalancerProbeID(lbName, lbRuleName)), + }, + FrontendPort: to.Int32Ptr(port.Port), + BackendPort: to.Int32Ptr(port.NodePort), + }, + } + } + + // remove unwated probes + dirtyProbes := false + var updatedProbes []network.Probe + if lb.Properties.Probes != nil { + updatedProbes = *lb.Properties.Probes + } + for i := len(updatedProbes) - 1; i >= 0; i-- { + existingProbe := updatedProbes[i] + if serviceOwnsRule(service, *existingProbe.Name) { + glog.V(10).Infof("reconcile(%s)(%t): lb probe(%s) - considering evicting", serviceName, wantLb, *existingProbe.Name) + keepProbe := false + if findProbe(expectedProbes, existingProbe) { + glog.V(10).Infof("reconcile(%s)(%t): lb probe(%s) - keeping", serviceName, wantLb, *existingProbe.Name) + keepProbe = true + } + if !keepProbe { + updatedProbes = append(updatedProbes[:i], updatedProbes[i+1:]...) + glog.V(10).Infof("reconcile(%s)(%t): lb probe(%s) - dropping", serviceName, wantLb, *existingProbe.Name) + dirtyProbes = true + } + } + } + // add missing, wanted probes + for _, expectedProbe := range expectedProbes { + foundProbe := false + if findProbe(updatedProbes, expectedProbe) { + glog.V(10).Infof("reconcile(%s)(%t): lb probe(%s) - already exists", serviceName, wantLb, *expectedProbe.Name) + foundProbe = true + } + if !foundProbe { + glog.V(10).Infof("reconcile(%s)(%t): lb probe(%s) - adding", serviceName, wantLb, *expectedProbe.Name) + updatedProbes = append(updatedProbes, expectedProbe) + dirtyProbes = true + } + } + if dirtyProbes { + dirtyLb = true + lb.Properties.Probes = &updatedProbes + } + + // update rules + dirtyRules := false + var updatedRules []network.LoadBalancingRule + if lb.Properties.LoadBalancingRules != nil { + updatedRules = *lb.Properties.LoadBalancingRules + } + // update rules: remove unwanted + for i := len(updatedRules) - 1; i >= 0; i-- { + existingRule := updatedRules[i] + if serviceOwnsRule(service, *existingRule.Name) { + keepRule := false + glog.V(10).Infof("reconcile(%s)(%t): lb rule(%s) - considering evicting", serviceName, wantLb, *existingRule.Name) + if findRule(expectedRules, existingRule) { + glog.V(10).Infof("reconcile(%s)(%t): lb rule(%s) - keeping", serviceName, wantLb, *existingRule.Name) + keepRule = true + } + if !keepRule { + glog.V(3).Infof("reconcile(%s)(%t): lb rule(%s) - dropping", serviceName, wantLb, *existingRule.Name) + updatedRules = append(updatedRules[:i], updatedRules[i+1:]...) + dirtyRules = true + } + } + } + // update rules: add needed + for _, expectedRule := range expectedRules { + foundRule := false + if findRule(updatedRules, expectedRule) { + glog.V(10).Infof("reconcile(%s)(%t): lb rule(%s) - already exists", serviceName, wantLb, *expectedRule.Name) + foundRule = true + } + if !foundRule { + glog.V(10).Infof("reconcile(%s)(%t): lb rule(%s) adding", serviceName, wantLb, *expectedRule.Name) + updatedRules = append(updatedRules, expectedRule) + dirtyRules = true + } + } + if dirtyRules { + dirtyLb = true + lb.Properties.LoadBalancingRules = &updatedRules + } + + return lb, dirtyLb, nil +} + +// This reconciles the Network Security Group similar to how the LB is reconciled. +// This entails adding required, missing SecurityRules and removing stale rules. +func (az *Cloud) reconcileSecurityGroup(sg network.SecurityGroup, clusterName string, service *api.Service) (network.SecurityGroup, bool, error) { + serviceName := getServiceName(service) + wantLb := len(service.Spec.Ports) > 0 + expectedSecurityRules := make([]network.SecurityRule, len(service.Spec.Ports)) + for i, port := range service.Spec.Ports { + securityRuleName := getRuleName(service, port) + _, securityProto, _, err := getProtocolsFromKubernetesProtocol(port.Protocol) + if err != nil { + return sg, false, err + } + + expectedSecurityRules[i] = network.SecurityRule{ + Name: to.StringPtr(securityRuleName), + Properties: &network.SecurityRulePropertiesFormat{ + Protocol: securityProto, + SourcePortRange: to.StringPtr("*"), + DestinationPortRange: to.StringPtr(strconv.Itoa(int(port.NodePort))), + SourceAddressPrefix: to.StringPtr("Internet"), + DestinationAddressPrefix: to.StringPtr("*"), + Access: network.Allow, + Direction: network.Inbound, + }, + } + } + + // update security rules + dirtySg := false + var updatedRules []network.SecurityRule + if sg.Properties.SecurityRules != nil { + updatedRules = *sg.Properties.SecurityRules + } + // update security rules: remove unwanted + for i := len(updatedRules) - 1; i >= 0; i-- { + existingRule := updatedRules[i] + if serviceOwnsRule(service, *existingRule.Name) { + glog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - considering evicting", serviceName, wantLb, *existingRule.Name) + keepRule := false + if findSecurityRule(expectedSecurityRules, existingRule) { + glog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - keeping", serviceName, wantLb, *existingRule.Name) + keepRule = true + } + if !keepRule { + glog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - dropping", serviceName, wantLb, *existingRule.Name) + updatedRules = append(updatedRules[:i], updatedRules[i+1:]...) + dirtySg = true + } + } + } + // update security rules: add needed + for _, expectedRule := range expectedSecurityRules { + foundRule := false + if findSecurityRule(updatedRules, expectedRule) { + glog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - already exists", serviceName, wantLb, *expectedRule.Name) + foundRule = true + } + if !foundRule { + glog.V(10).Infof("reconcile(%s)(%t): sg rule(%s) - adding", serviceName, wantLb, *expectedRule.Name) + + nextAvailablePriority, err := getNextAvailablePriority(updatedRules) + if err != nil { + return sg, false, err + } + + expectedRule.Properties.Priority = to.Int32Ptr(nextAvailablePriority) + updatedRules = append(updatedRules, expectedRule) + dirtySg = true + } + } + if dirtySg { + sg.Properties.SecurityRules = &updatedRules + } + return sg, dirtySg, nil +} + +func findProbe(probes []network.Probe, probe network.Probe) bool { + for _, existingProbe := range probes { + if strings.EqualFold(*existingProbe.Name, *probe.Name) { + return true + } + } + return false +} + +func findRule(rules []network.LoadBalancingRule, rule network.LoadBalancingRule) bool { + for _, existingRule := range rules { + if strings.EqualFold(*existingRule.Name, *rule.Name) { + return true + } + } + return false +} + +func findSecurityRule(rules []network.SecurityRule, rule network.SecurityRule) bool { + for _, existingRule := range rules { + if strings.EqualFold(*existingRule.Name, *rule.Name) { + return true + } + } + return false +} + +// This ensures the given VM's Primary NIC's Primary IP Configuration is +// participating in the specified LoadBalancer Backend Pool. +func (az *Cloud) ensureHostInPool(serviceName, machineName string, backendPoolID string) error { + machine, err := az.VirtualMachinesClient.Get(az.ResourceGroup, machineName, "") + if err != nil { + return err + } + + primaryNicID, err := getPrimaryInterfaceID(machine) + if err != nil { + return err + } + nicName, err := getLastSegment(primaryNicID) + if err != nil { + return err + } + + nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "") + if err != nil { + return err + } + + var primaryIPConfig *network.InterfaceIPConfiguration + primaryIPConfig, err = getPrimaryIPConfig(nic) + if err != nil { + return err + } + + foundPool := false + newBackendPools := []network.BackendAddressPool{} + if primaryIPConfig.Properties.LoadBalancerBackendAddressPools != nil { + newBackendPools = *primaryIPConfig.Properties.LoadBalancerBackendAddressPools + } + for _, existingPool := range newBackendPools { + if strings.EqualFold(backendPoolID, *existingPool.ID) { + foundPool = true + break + } + } + if !foundPool { + newBackendPools = append(newBackendPools, + network.BackendAddressPool{ + ID: to.StringPtr(backendPoolID), + }) + + primaryIPConfig.Properties.LoadBalancerBackendAddressPools = &newBackendPools + + glog.V(3).Infof("nicupdate(%s): nic(%s) - updating", serviceName, nicName) + _, err := az.InterfacesClient.CreateOrUpdate(az.ResourceGroup, *nic.Name, nic, nil) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_routes.go b/pkg/cloudprovider/providers/azure/azure_routes.go new file mode 100644 index 00000000000..00f12a4375f --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_routes.go @@ -0,0 +1,161 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/cloudprovider" + + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest/to" + "github.com/golang/glog" +) + +// ListRoutes lists all managed routes that belong to the specified clusterName +func (az *Cloud) ListRoutes(clusterName string) (routes []*cloudprovider.Route, err error) { + glog.V(10).Infof("list: START clusterName=%q", clusterName) + routeTable, existsRouteTable, err := az.getRouteTable() + if err != nil { + return nil, err + } + if !existsRouteTable { + return []*cloudprovider.Route{}, nil + } + + var kubeRoutes []*cloudprovider.Route + if routeTable.Properties.Routes != nil { + kubeRoutes = make([]*cloudprovider.Route, len(*routeTable.Properties.Routes)) + for i, route := range *routeTable.Properties.Routes { + instance := getInstanceName(*route.Name) + cidr := *route.Properties.AddressPrefix + glog.V(10).Infof("list: * instance=%q, cidr=%q", instance, cidr) + + kubeRoutes[i] = &cloudprovider.Route{ + Name: *route.Name, + TargetInstance: instance, + DestinationCIDR: cidr, + } + } + } + + glog.V(10).Info("list: FINISH") + return kubeRoutes, nil +} + +// CreateRoute creates the described managed route +// route.Name will be ignored, although the cloud-provider may use nameHint +// to create a more user-meaningful name. +func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *cloudprovider.Route) error { + glog.V(2).Infof("create: creating route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetInstance, kubeRoute.DestinationCIDR) + + routeTable, existsRouteTable, err := az.getRouteTable() + if err != nil { + return err + } + if !existsRouteTable { + routeTable = network.RouteTable{ + Name: to.StringPtr(az.RouteTableName), + Location: to.StringPtr(az.Location), + Properties: &network.RouteTablePropertiesFormat{}, + } + + glog.V(3).Infof("create: creating routetable. routeTableName=%q", az.RouteTableName) + _, err = az.RouteTablesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, routeTable, nil) + if err != nil { + return err + } + + routeTable, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "") + if err != nil { + return err + } + } + + // ensure the subnet is properly configured + subnet, err := az.SubnetsClient.Get(az.ResourceGroup, az.VnetName, az.SubnetName, "") + if err != nil { + // 404 is fatal here + return err + } + if subnet.Properties.RouteTable != nil { + if *subnet.Properties.RouteTable.ID != *routeTable.ID { + return fmt.Errorf("The subnet has a route table, but it was unrecognized. Refusing to modify it. active_routetable=%q expected_routetable=%q", *subnet.Properties.RouteTable.ID, *routeTable.ID) + } + } else { + subnet.Properties.RouteTable = &network.RouteTable{ + ID: routeTable.ID, + } + glog.V(3).Info("create: updating subnet") + _, err := az.SubnetsClient.CreateOrUpdate(az.ResourceGroup, az.VnetName, az.SubnetName, subnet, nil) + if err != nil { + return err + } + } + + targetIP, err := az.getIPForMachine(kubeRoute.TargetInstance) + if err != nil { + return err + } + + routeName := getRouteName(kubeRoute.TargetInstance) + route := network.Route{ + Name: to.StringPtr(routeName), + Properties: &network.RoutePropertiesFormat{ + AddressPrefix: to.StringPtr(kubeRoute.DestinationCIDR), + NextHopType: network.RouteNextHopTypeVirtualAppliance, + NextHopIPAddress: to.StringPtr(targetIP), + }, + } + + glog.V(3).Infof("create: creating route: instance=%q cidr=%q", kubeRoute.TargetInstance, kubeRoute.DestinationCIDR) + _, err = az.RoutesClient.CreateOrUpdate(az.ResourceGroup, az.RouteTableName, *route.Name, route, nil) + if err != nil { + return err + } + + glog.V(2).Infof("create: route created. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetInstance, kubeRoute.DestinationCIDR) + return nil +} + +// DeleteRoute deletes the specified managed route +// Route should be as returned by ListRoutes +func (az *Cloud) DeleteRoute(clusterName string, kubeRoute *cloudprovider.Route) error { + glog.V(2).Infof("delete: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetInstance, kubeRoute.DestinationCIDR) + + routeName := getRouteName(kubeRoute.TargetInstance) + _, err := az.RoutesClient.Delete(az.ResourceGroup, az.RouteTableName, routeName, nil) + if err != nil { + return err + } + + glog.V(2).Infof("delete: route deleted. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetInstance, kubeRoute.DestinationCIDR) + return nil +} + +// This must be kept in sync with getInstanceName. +// These two functions enable stashing the instance name in the route +// and then retrieving it later when listing. This is needed because +// Azure does not let you put tags/descriptions on the Route itself. +func getRouteName(instanceName string) string { + return fmt.Sprintf("%s", instanceName) +} + +// Used with getRouteName. See comment on getRouteName. +func getInstanceName(routeName string) string { + return fmt.Sprintf("%s", routeName) +} diff --git a/pkg/cloudprovider/providers/azure/azure_test.go b/pkg/cloudprovider/providers/azure/azure_test.go new file mode 100644 index 00000000000..65164ed510c --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_test.go @@ -0,0 +1,517 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/types" + + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest/to" +) + +var testClusterName = "testCluster" + +// Test additional of a new service/port. +func TestReconcileLoadBalancerAddPort(t *testing.T) { + az := getTestCloud() + svc := getTestService("servicea", 80) + pip := getTestPublicIP() + lb := getTestLoadBalancer() + hosts := []string{} + + lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, hosts) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + if !updated { + t.Error("Expected the loadbalancer to need an update") + } + + // ensure we got a frontend ip configuration + if len(*lb.Properties.FrontendIPConfigurations) != 1 { + t.Error("Expected the loadbalancer to have a frontend ip configuration") + } + + validateLoadBalancer(t, lb, svc) +} + +// Test removing all services results in removing the frontend ip configuration +func TestReconcileLoadBalancerRemoveAllPortsRemovesFrontendConfig(t *testing.T) { + az := getTestCloud() + svc := getTestService("servicea", 80) + lb := getTestLoadBalancer() + pip := getTestPublicIP() + hosts := []string{} + + lb, updated, err := az.reconcileLoadBalancer(lb, &pip, testClusterName, &svc, hosts) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + svcUpdated := getTestService("servicea") + lb, updated, err = az.reconcileLoadBalancer(lb, nil, testClusterName, &svcUpdated, hosts) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + if !updated { + t.Error("Expected the loadbalancer to need an update") + } + + // ensure we abandonded the frontend ip configuration + if len(*lb.Properties.FrontendIPConfigurations) != 0 { + t.Error("Expected the loadbalancer to have no frontend ip configuration") + } + + validateLoadBalancer(t, lb, svcUpdated) +} + +// Test removal of a port from an existing service. +func TestReconcileLoadBalancerRemovesPort(t *testing.T) { + az := getTestCloud() + svc := getTestService("servicea", 80, 443) + pip := getTestPublicIP() + hosts := []string{} + + existingLoadBalancer := getTestLoadBalancer(svc) + + svcUpdated := getTestService("servicea", 80) + updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svcUpdated, hosts) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + validateLoadBalancer(t, updatedLoadBalancer, svcUpdated) +} + +// Test reconciliation of multiple services on same port +func TestReconcileLoadBalancerMultipleServices(t *testing.T) { + az := getTestCloud() + svc1 := getTestService("servicea", 80, 443) + svc2 := getTestService("serviceb", 80) + pip := getTestPublicIP() + hosts := []string{} + + existingLoadBalancer := getTestLoadBalancer() + + updatedLoadBalancer, _, err := az.reconcileLoadBalancer(existingLoadBalancer, &pip, testClusterName, &svc1, hosts) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + updatedLoadBalancer, _, err = az.reconcileLoadBalancer(updatedLoadBalancer, &pip, testClusterName, &svc2, hosts) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + validateLoadBalancer(t, updatedLoadBalancer, svc1, svc2) +} + +func TestReconcileSecurityGroupNewServiceAddsPort(t *testing.T) { + az := getTestCloud() + svc1 := getTestService("serviceea", 80) + + sg := getTestSecurityGroup() + + sg, _, err := az.reconcileSecurityGroup(sg, testClusterName, &svc1) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + validateSecurityGroup(t, sg, svc1) +} + +func TestReconcileSecurityGroupRemoveServiceRemovesPort(t *testing.T) { + az := getTestCloud() + svc := getTestService("servicea", 80, 443) + + sg := getTestSecurityGroup(svc) + + svcUpdated := getTestService("servicea", 80) + sg, _, err := az.reconcileSecurityGroup(sg, testClusterName, &svcUpdated) + if err != nil { + t.Errorf("Unexpected error: %q", err) + } + + validateSecurityGroup(t, sg, svcUpdated) +} + +func getTestCloud() *Cloud { + return &Cloud{ + Config: Config{ + TenantID: "tenant", + SubscriptionID: "subscription", + ResourceGroup: "rg", + Location: "westus", + VnetName: "vnet", + SubnetName: "subnet", + SecurityGroupName: "nsg", + RouteTableName: "rt", + }, + } +} + +func getBackendPort(port int32) int32 { + return port + 10000 +} + +func getTestPublicIP() network.PublicIPAddress { + pip := network.PublicIPAddress{} + pip.ID = to.StringPtr("/this/is/a/public/ip/address/id") + return pip +} + +func getTestService(identifier string, requestedPorts ...int32) api.Service { + ports := []api.ServicePort{} + for _, port := range requestedPorts { + ports = append(ports, api.ServicePort{ + Name: fmt.Sprintf("port-%d", port), + Protocol: api.ProtocolTCP, + Port: port, + NodePort: getBackendPort(port), + }) + } + + svc := api.Service{ + Spec: api.ServiceSpec{ + Type: api.ServiceTypeLoadBalancer, + Ports: ports, + }, + } + svc.Name = identifier + svc.Namespace = "default" + svc.UID = types.UID(identifier) + + return svc +} + +func getTestLoadBalancer(services ...api.Service) network.LoadBalancer { + rules := []network.LoadBalancingRule{} + probes := []network.Probe{} + + for _, service := range services { + for _, port := range service.Spec.Ports { + ruleName := getRuleName(&service, port) + rules = append(rules, network.LoadBalancingRule{ + Name: to.StringPtr(ruleName), + Properties: &network.LoadBalancingRulePropertiesFormat{ + FrontendPort: to.Int32Ptr(port.Port), + BackendPort: to.Int32Ptr(port.NodePort), + }, + }) + probes = append(probes, network.Probe{ + Name: to.StringPtr(ruleName), + Properties: &network.ProbePropertiesFormat{ + Port: to.Int32Ptr(port.NodePort), + }, + }) + } + } + + lb := network.LoadBalancer{ + Properties: &network.LoadBalancerPropertiesFormat{ + LoadBalancingRules: &rules, + Probes: &probes, + }, + } + + return lb +} + +func getTestSecurityGroup(services ...api.Service) network.SecurityGroup { + rules := []network.SecurityRule{} + + for _, service := range services { + for _, port := range service.Spec.Ports { + ruleName := getRuleName(&service, port) + rules = append(rules, network.SecurityRule{ + Name: to.StringPtr(ruleName), + Properties: &network.SecurityRulePropertiesFormat{ + DestinationPortRange: to.StringPtr(fmt.Sprintf("%d", port.NodePort)), + }, + }) + } + } + + sg := network.SecurityGroup{ + Properties: &network.SecurityGroupPropertiesFormat{ + SecurityRules: &rules, + }, + } + + return sg +} + +func validateLoadBalancer(t *testing.T, loadBalancer network.LoadBalancer, services ...api.Service) { + expectedRuleCount := 0 + for _, svc := range services { + for _, wantedRule := range svc.Spec.Ports { + expectedRuleCount++ + wantedRuleName := getRuleName(&svc, wantedRule) + foundRule := false + for _, actualRule := range *loadBalancer.Properties.LoadBalancingRules { + if strings.EqualFold(*actualRule.Name, wantedRuleName) && + *actualRule.Properties.FrontendPort == wantedRule.Port && + *actualRule.Properties.BackendPort == wantedRule.NodePort { + foundRule = true + break + } + } + if !foundRule { + t.Errorf("Expected rule but didn't find it: %q", wantedRuleName) + } + + foundProbe := false + for _, actualProbe := range *loadBalancer.Properties.Probes { + if strings.EqualFold(*actualProbe.Name, wantedRuleName) && + *actualProbe.Properties.Port == wantedRule.NodePort { + foundProbe = true + break + } + } + if !foundProbe { + t.Errorf("Expected probe but didn't find it: %q", wantedRuleName) + } + } + } + + lenRules := len(*loadBalancer.Properties.LoadBalancingRules) + if lenRules != expectedRuleCount { + t.Errorf("Expected the loadbalancer to have %d rules. Found %d.", expectedRuleCount, lenRules) + } + lenProbes := len(*loadBalancer.Properties.Probes) + if lenProbes != expectedRuleCount { + t.Errorf("Expected the loadbalancer to have %d probes. Found %d.", expectedRuleCount, lenProbes) + } +} + +func validateSecurityGroup(t *testing.T, securityGroup network.SecurityGroup, services ...api.Service) { + expectedRuleCount := 0 + for _, svc := range services { + for _, wantedRule := range svc.Spec.Ports { + expectedRuleCount++ + wantedRuleName := getRuleName(&svc, wantedRule) + foundRule := false + for _, actualRule := range *securityGroup.Properties.SecurityRules { + if strings.EqualFold(*actualRule.Name, wantedRuleName) && + *actualRule.Properties.DestinationPortRange == fmt.Sprintf("%d", wantedRule.NodePort) { + foundRule = true + break + } + } + if !foundRule { + t.Errorf("Expected rule but didn't find it: %q", wantedRuleName) + } + } + } + + lenRules := len(*securityGroup.Properties.SecurityRules) + if lenRules != expectedRuleCount { + t.Errorf("Expected the loadbalancer to have %d rules. Found %d.", expectedRuleCount, lenRules) + } +} + +func TestSecurityRulePriorityPicksNextAvailablePriority(t *testing.T) { + rules := []network.SecurityRule{} + + var expectedPriority int32 = loadBalancerMinimumPriority + 50 + + var i int32 + for i = loadBalancerMinimumPriority; i < expectedPriority; i++ { + rules = append(rules, network.SecurityRule{ + Properties: &network.SecurityRulePropertiesFormat{ + Priority: to.Int32Ptr(i), + }, + }) + } + + priority, err := getNextAvailablePriority(rules) + if err != nil { + t.Errorf("Unexpectected error: %q", err) + } + + if priority != expectedPriority { + t.Errorf("Expected priority %d. Got priority %d.", expectedPriority, priority) + } +} + +func TestSecurityRulePriorityFailsIfExhausted(t *testing.T) { + rules := []network.SecurityRule{} + + var i int32 + for i = loadBalancerMinimumPriority; i < loadBalancerMaximumPriority; i++ { + rules = append(rules, network.SecurityRule{ + Properties: &network.SecurityRulePropertiesFormat{ + Priority: to.Int32Ptr(i), + }, + }) + } + + _, err := getNextAvailablePriority(rules) + if err == nil { + t.Error("Expectected an error. There are no priority levels left.") + } +} + +func TestProtocolTranslationTCP(t *testing.T) { + proto := api.ProtocolTCP + transportProto, securityGroupProto, probeProto, err := getProtocolsFromKubernetesProtocol(proto) + if err != nil { + t.Error(err) + } + + if transportProto != network.TransportProtocolTCP { + t.Errorf("Expected TCP LoadBalancer Rule Protocol. Got %v", transportProto) + } + if securityGroupProto != network.TCP { + t.Errorf("Expected TCP SecurityGroup Protocol. Got %v", transportProto) + } + if probeProto != network.ProbeProtocolTCP { + t.Errorf("Expected TCP LoadBalancer Probe Protocol. Got %v", transportProto) + } +} + +func TestProtocolTranslationUDP(t *testing.T) { + proto := api.ProtocolUDP + _, _, _, err := getProtocolsFromKubernetesProtocol(proto) + if err == nil { + t.Error("Expected an error. UDP is unsupported.") + } +} + +// Test Configuration deserialization (json) +func TestNewCloudFromJSON(t *testing.T) { + config := `{ + "tenantId": "--tenant-id--", + "subscriptionId": "--subscription-id--", + "aadClientId": "--aad-client-id--", + "aadClientSecret": "--aad-client-secret--", + "resourceGroup": "--resource-group--", + "location": "--location--", + "subnetName": "--subnet-name--", + "securityGroupName": "--security-group-name--", + "vnetName": "--vnet-name--", + "routeTableName": "--route-table-name--" + }` + validateConfig(t, config) +} + +// Test Configuration deserialization (yaml) +func TestNewCloudFromYAML(t *testing.T) { + config := ` +tenantId: --tenant-id-- +subscriptionId: --subscription-id-- +aadClientId: --aad-client-id-- +aadClientSecret: --aad-client-secret-- +resourceGroup: --resource-group-- +location: --location-- +subnetName: --subnet-name-- +securityGroupName: --security-group-name-- +vnetName: --vnet-name-- +routeTableName: --route-table-name-- +` + validateConfig(t, config) +} + +func validateConfig(t *testing.T, config string) { + configReader := strings.NewReader(config) + cloud, err := NewCloud(configReader) + if err != nil { + t.Error(err) + } + + azureCloud, ok := cloud.(*Cloud) + if !ok { + t.Error("NewCloud returned incorrect type") + } + + if azureCloud.TenantID != "--tenant-id--" { + t.Errorf("got incorrect value for TenantID") + } + if azureCloud.SubscriptionID != "--subscription-id--" { + t.Errorf("got incorrect value for SubscriptionID") + } + if azureCloud.AADClientID != "--aad-client-id--" { + t.Errorf("got incorrect value for AADClientID") + } + if azureCloud.AADClientSecret != "--aad-client-secret--" { + t.Errorf("got incorrect value for AADClientSecret") + } + if azureCloud.ResourceGroup != "--resource-group--" { + t.Errorf("got incorrect value for ResourceGroup") + } + if azureCloud.Location != "--location--" { + t.Errorf("got incorrect value for Location") + } + if azureCloud.SubnetName != "--subnet-name--" { + t.Errorf("got incorrect value for SubnetName") + } + if azureCloud.SecurityGroupName != "--security-group-name--" { + t.Errorf("got incorrect value for SecurityGroupName") + } + if azureCloud.VnetName != "--vnet-name--" { + t.Errorf("got incorrect value for VnetName") + } + if azureCloud.RouteTableName != "--route-table-name--" { + t.Errorf("got incorrect value for RouteTableName") + } +} + +func TestDecodeInstanceInfo(t *testing.T) { + response := `{"ID":"_azdev","UD":"0","FD":"99"}` + + faultDomain, err := readFaultDomain(strings.NewReader(response)) + if err != nil { + t.Error("Unexpected error in ReadFaultDomain") + } + + if faultDomain == nil { + t.Error("Fault domain was unexpectedly nil") + } + + if *faultDomain != "99" { + t.Error("got incorrect fault domain") + } +} + +func TestFilterNodes(t *testing.T) { + nodes := []compute.VirtualMachine{ + {Name: to.StringPtr("test")}, + {Name: to.StringPtr("test2")}, + {Name: to.StringPtr("3test")}, + } + + filteredNodes, err := filterNodes(nodes, "^test$") + if err != nil { + t.Errorf("Unexpeted error when filtering: %q", err) + } + + if len(filteredNodes) != 1 { + t.Error("Got too many nodes after filtering") + } + + if *filteredNodes[0].Name != "test" { + t.Error("Get the wrong node after filtering") + } +} diff --git a/pkg/cloudprovider/providers/azure/azure_util.go b/pkg/cloudprovider/providers/azure/azure_util.go new file mode 100644 index 00000000000..4fe5aa4ebd6 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_util.go @@ -0,0 +1,236 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "fmt" + "strings" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/cloudprovider" + + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/network" +) + +const ( + loadBalancerMinimumPriority = 500 + loadBalancerMaximumPriority = 4096 + + machineResourceIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Compute/virtualMachines/%s" + frontendIPConfigIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers/%s/frontendIPConfigurations/%s" + backendPoolIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers/%s/backendAddressPools/%s" + loadBalancerRuleIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers/%s/loadBalancingRules/%s" + loadBalancerProbeIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/loadBalancers/%s/probes/%s" + securityRuleIDTemplate = "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.Network/networkSecurityGroups/%s/securityRules/%s" +) + +// returns the full identifier of a machine +func (az *Cloud) getMachineID(machineName string) string { + return fmt.Sprintf( + machineResourceIDTemplate, + az.SubscriptionID, + az.ResourceGroup, + machineName) +} + +// returns the full identifier of a loadbalancer frontendipconfiguration. +func (az *Cloud) getFrontendIPConfigID(lbName, backendPoolName string) string { + return fmt.Sprintf( + frontendIPConfigIDTemplate, + az.SubscriptionID, + az.ResourceGroup, + lbName, + backendPoolName) +} + +// returns the full identifier of a loadbalancer backendpool. +func (az *Cloud) getBackendPoolID(lbName, backendPoolName string) string { + return fmt.Sprintf( + backendPoolIDTemplate, + az.SubscriptionID, + az.ResourceGroup, + lbName, + backendPoolName) +} + +// returns the full identifier of a loadbalancer rule. +func (az *Cloud) getLoadBalancerRuleID(lbName, lbRuleName string) string { + return fmt.Sprintf( + loadBalancerRuleIDTemplate, + az.SubscriptionID, + az.ResourceGroup, + lbName, + lbRuleName) +} + +// returns the full identifier of a loadbalancer probe. +func (az *Cloud) getLoadBalancerProbeID(lbName, lbRuleName string) string { + return fmt.Sprintf( + loadBalancerProbeIDTemplate, + az.SubscriptionID, + az.ResourceGroup, + lbName, + lbRuleName) +} + +// returns the full identifier of a network security group security rule. +func (az *Cloud) getSecurityRuleID(securityRuleName string) string { + return fmt.Sprintf( + securityRuleIDTemplate, + az.SubscriptionID, + az.ResourceGroup, + az.SecurityGroupName, + securityRuleName) +} + +// returns the deepest child's identifier from a full identifier string. +func getLastSegment(ID string) (string, error) { + parts := strings.Split(ID, "/") + name := parts[len(parts)-1] + if len(name) == 0 { + return "", fmt.Errorf("resource name was missing from identifier") + } + + return name, nil +} + +// returns the equivalent LoadBalancerRule, SecurityRule and LoadBalancerProbe +// protocol types for the given Kubernetes protocol type. +func getProtocolsFromKubernetesProtocol(protocol api.Protocol) (network.TransportProtocol, network.SecurityRuleProtocol, network.ProbeProtocol, error) { + switch protocol { + case api.ProtocolTCP: + return network.TransportProtocolTCP, network.TCP, network.ProbeProtocolTCP, nil + default: + return "", "", "", fmt.Errorf("Only TCP is supported for Azure LoadBalancers") + } +} + +// This returns the full identifier of the primary NIC for the given VM. +func getPrimaryInterfaceID(machine compute.VirtualMachine) (string, error) { + if len(*machine.Properties.NetworkProfile.NetworkInterfaces) == 1 { + return *(*machine.Properties.NetworkProfile.NetworkInterfaces)[0].ID, nil + } + + for _, ref := range *machine.Properties.NetworkProfile.NetworkInterfaces { + if *ref.Properties.Primary { + return *ref.ID, nil + } + } + + return "", fmt.Errorf("failed to find a primary nic for the vm. vmname=%q", *machine.Name) +} + +func getPrimaryIPConfig(nic network.Interface) (*network.InterfaceIPConfiguration, error) { + if len(*nic.Properties.IPConfigurations) == 1 { + return &((*nic.Properties.IPConfigurations)[0]), nil + } + + // we're here because we either have multiple ipconfigs and can't determine the primary: + // https://github.com/Azure/azure-rest-api-specs/issues/305 + // or somehow we had zero ipconfigs + return nil, fmt.Errorf("failed to determine the determine primary ipconfig. nicname=%q", *nic.Name) +} + +func getLoadBalancerName(clusterName string) string { + return clusterName +} + +func getBackendPoolName(clusterName string) string { + return clusterName +} + +func getRuleName(service *api.Service, port api.ServicePort) string { + return fmt.Sprintf("%s-%s-%d-%d", getRulePrefix(service), port.Protocol, port.Port, port.NodePort) +} + +// This returns a human-readable version of the Service used to tag some resources. +// This is only used for human-readable convenience, and not to filter. +func getServiceName(service *api.Service) string { + return fmt.Sprintf("%s/%s", service.Namespace, service.Name) +} + +// This returns a prefix for loadbalancer/security rules. +func getRulePrefix(service *api.Service) string { + return cloudprovider.GetLoadBalancerName(service) +} + +func serviceOwnsRule(service *api.Service, rule string) bool { + prefix := getRulePrefix(service) + return strings.HasPrefix(strings.ToUpper(rule), strings.ToUpper(prefix)) +} + +func getFrontendIPConfigName(service *api.Service) string { + return cloudprovider.GetLoadBalancerName(service) +} + +func getPublicIPName(clusterName string, service *api.Service) string { + return fmt.Sprintf("%s-%s", clusterName, cloudprovider.GetLoadBalancerName(service)) +} + +// This returns the next available rule priority level for a given set of security rules. +func getNextAvailablePriority(rules []network.SecurityRule) (int32, error) { + var smallest int32 = loadBalancerMinimumPriority + var spread int32 = 1 + +outer: + for smallest < loadBalancerMaximumPriority { + for _, rule := range rules { + if *rule.Properties.Priority == smallest { + smallest += spread + continue outer + } + } + // no one else had it + return smallest, nil + } + + return -1, fmt.Errorf("SecurityGroup priorities are exhausted") +} + +func (az *Cloud) getIPForMachine(machineName string) (string, error) { + machine, exists, err := az.getVirtualMachine(machineName) + if !exists { + return "", cloudprovider.InstanceNotFound + } + if err != nil { + return "", err + } + + nicID, err := getPrimaryInterfaceID(machine) + if err != nil { + return "", err + } + + nicName, err := getLastSegment(nicID) + if err != nil { + return "", err + } + + nic, err := az.InterfacesClient.Get(az.ResourceGroup, nicName, "") + if err != nil { + return "", err + } + + ipConfig, err := getPrimaryIPConfig(nic) + if err != nil { + return "", err + } + + targetIP := *ipConfig.Properties.PrivateIPAddress + return targetIP, nil +} diff --git a/pkg/cloudprovider/providers/azure/azure_wrap.go b/pkg/cloudprovider/providers/azure/azure_wrap.go new file mode 100644 index 00000000000..782c751c246 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_wrap.go @@ -0,0 +1,124 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "net/http" + + "github.com/Azure/azure-sdk-for-go/arm/compute" + "github.com/Azure/azure-sdk-for-go/arm/network" + "github.com/Azure/go-autorest/autorest" +) + +// checkExistsFromError inspects an error and returns a true if err is nil, +// false if error is an autorest.Error with StatusCode=404 and will return the +// error back if error is another status code or another type of error. +func checkResourceExistsFromError(err error) (bool, error) { + if err == nil { + return true, nil + } + v, ok := err.(autorest.DetailedError) + if ok && v.StatusCode == http.StatusNotFound { + return false, nil + } + return false, v +} + +func (az *Cloud) getVirtualMachine(machineName string) (vm compute.VirtualMachine, exists bool, err error) { + var realErr error + + vm, err = az.VirtualMachinesClient.Get(az.ResourceGroup, machineName, "") + + exists, realErr = checkResourceExistsFromError(err) + if realErr != nil { + return vm, false, realErr + } + + if !exists { + return vm, false, nil + } + + return vm, exists, err +} + +func (az *Cloud) getRouteTable() (routeTable network.RouteTable, exists bool, err error) { + var realErr error + + routeTable, err = az.RouteTablesClient.Get(az.ResourceGroup, az.RouteTableName, "") + + exists, realErr = checkResourceExistsFromError(err) + if realErr != nil { + return routeTable, false, realErr + } + + if !exists { + return routeTable, false, nil + } + + return routeTable, exists, err +} + +func (az *Cloud) getSecurityGroup() (sg network.SecurityGroup, exists bool, err error) { + var realErr error + + sg, err = az.SecurityGroupsClient.Get(az.ResourceGroup, az.SecurityGroupName, "") + + exists, realErr = checkResourceExistsFromError(err) + if realErr != nil { + return sg, false, realErr + } + + if !exists { + return sg, false, nil + } + + return sg, exists, err +} + +func (az *Cloud) getAzureLoadBalancer(name string) (lb network.LoadBalancer, exists bool, err error) { + var realErr error + + lb, err = az.LoadBalancerClient.Get(az.ResourceGroup, name, "") + + exists, realErr = checkResourceExistsFromError(err) + if realErr != nil { + return lb, false, realErr + } + + if !exists { + return lb, false, nil + } + + return lb, exists, err +} + +func (az *Cloud) getPublicIPAddress(name string) (pip network.PublicIPAddress, exists bool, err error) { + var realErr error + + pip, err = az.PublicIPAddressesClient.Get(az.ResourceGroup, name, "") + + exists, realErr = checkResourceExistsFromError(err) + if realErr != nil { + return pip, false, realErr + } + + if !exists { + return pip, false, nil + } + + return pip, exists, err +} diff --git a/pkg/cloudprovider/providers/azure/azure_zones.go b/pkg/cloudprovider/providers/azure/azure_zones.go new file mode 100644 index 00000000000..ab2dd9e3b27 --- /dev/null +++ b/pkg/cloudprovider/providers/azure/azure_zones.go @@ -0,0 +1,78 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package azure + +import ( + "encoding/json" + "io" + "io/ioutil" + "net/http" + "sync" + + "k8s.io/kubernetes/pkg/cloudprovider" +) + +const instanceInfoURL = "http://169.254.169.254/metadata/v1/InstanceInfo" + +var faultMutex = &sync.Mutex{} +var faultDomain *string + +type instanceInfo struct { + ID string `json:"ID"` + UpdateDomain string `json:"UD"` + FaultDomain string `json:"FD"` +} + +// GetZone returns the Zone containing the current failure zone and locality region that the program is running in +func (az *Cloud) GetZone() (cloudprovider.Zone, error) { + faultMutex.Lock() + if faultDomain == nil { + var err error + faultDomain, err = fetchFaultDomain() + if err != nil { + return cloudprovider.Zone{}, err + } + } + zone := cloudprovider.Zone{ + FailureDomain: *faultDomain, + Region: az.Location, + } + faultMutex.Unlock() + return zone, nil +} + +func fetchFaultDomain() (*string, error) { + resp, err := http.Get(instanceInfoURL) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return readFaultDomain(resp.Body) +} + +func readFaultDomain(reader io.Reader) (*string, error) { + var instanceInfo instanceInfo + body, err := ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + err = json.Unmarshal(body, &instanceInfo) + if err != nil { + return nil, err + } + return &instanceInfo.FaultDomain, nil +} diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index 5801ca2bbf4..6b2bd09446e 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -121,7 +121,7 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) { } // GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer. -func (f *FakeCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) { +func (f *FakeCloud) GetLoadBalancer(clusterName string, service *api.Service) (*api.LoadBalancerStatus, bool, error) { status := &api.LoadBalancerStatus{} status.Ingress = []api.LoadBalancerIngress{{IP: f.ExternalIP.String()}} @@ -130,7 +130,7 @@ func (f *FakeCloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStat // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) EnsureLoadBalancer(service *api.Service, hosts []string) (*api.LoadBalancerStatus, error) { +func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *api.Service, hosts []string) (*api.LoadBalancerStatus, error) { f.addCall("create") if f.Balancers == nil { f.Balancers = make(map[string]FakeBalancer) @@ -155,7 +155,7 @@ func (f *FakeCloud) EnsureLoadBalancer(service *api.Service, hosts []string) (*a // UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer. // It adds an entry "update" into the internal method call record. -func (f *FakeCloud) UpdateLoadBalancer(service *api.Service, hosts []string) error { +func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *api.Service, hosts []string) error { f.addCall("update") f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, hosts}) return f.Err @@ -163,7 +163,7 @@ func (f *FakeCloud) UpdateLoadBalancer(service *api.Service, hosts []string) err // EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted. // It adds an entry "delete" into the internal method call record. -func (f *FakeCloud) EnsureLoadBalancerDeleted(service *api.Service) error { +func (f *FakeCloud) EnsureLoadBalancerDeleted(clusterName string, service *api.Service) error { f.addCall("delete") return f.Err } diff --git a/pkg/cloudprovider/providers/openstack/openstack_test.go b/pkg/cloudprovider/providers/openstack/openstack_test.go index d11cdd16d3e..5a4b1ce98a1 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_test.go +++ b/pkg/cloudprovider/providers/openstack/openstack_test.go @@ -31,6 +31,7 @@ import ( const volumeAvailableStatus = "available" const volumeInUseStatus = "in-use" const volumeCreateTimeoutSeconds = 30 +const testClusterName = "testCluster" func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string, timeoutSeconds int) { timeout := timeoutSeconds @@ -216,7 +217,7 @@ func TestLoadBalancer(t *testing.T) { t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?") } - _, exists, err := lb.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}}) + _, exists, err := lb.GetLoadBalancer(testClusterName, &api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}}) if err != nil { t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err) } @@ -242,7 +243,7 @@ func TestLoadBalancerV2(t *testing.T) { t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?") } - _, exists, err := lbaas.GetLoadBalancer(&api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}}) + _, exists, err := lbaas.GetLoadBalancer(testClusterName, &api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}}) if err != nil { t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err) } diff --git a/pkg/cloudprovider/providers/providers.go b/pkg/cloudprovider/providers/providers.go index cd9aadeea3e..633b3e21eed 100644 --- a/pkg/cloudprovider/providers/providers.go +++ b/pkg/cloudprovider/providers/providers.go @@ -19,6 +19,7 @@ package cloudprovider import ( // Cloud providers _ "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + _ "k8s.io/kubernetes/pkg/cloudprovider/providers/azure" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" _ "k8s.io/kubernetes/pkg/cloudprovider/providers/openstack"