Merge branch 'master' into upgrade_aliases_branch

This commit is contained in:
Jing Ai 2017-11-29 09:35:33 -08:00
commit 2b0681d73b
19 changed files with 633 additions and 325 deletions

View File

@ -30,6 +30,7 @@ REGIONAL_KUBE_ADDONS=${REGIONAL_KUBE_ADDONS:-true}
NODE_SIZE=${NODE_SIZE:-n1-standard-2} NODE_SIZE=${NODE_SIZE:-n1-standard-2}
NUM_NODES=${NUM_NODES:-3} NUM_NODES=${NUM_NODES:-3}
MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)} MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)}
MASTER_MIN_CPU_ARCHITECTURE=${MASTER_MIN_CPU_ARCHITECTURE:-} # To allow choosing better architectures.
MASTER_DISK_TYPE=pd-ssd MASTER_DISK_TYPE=pd-ssd
MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)} MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)}
MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)} MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)}

View File

@ -30,6 +30,7 @@ REGIONAL_KUBE_ADDONS=${REGIONAL_KUBE_ADDONS:-true}
NODE_SIZE=${NODE_SIZE:-n1-standard-2} NODE_SIZE=${NODE_SIZE:-n1-standard-2}
NUM_NODES=${NUM_NODES:-3} NUM_NODES=${NUM_NODES:-3}
MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)} MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)}
MASTER_MIN_CPU_ARCHITECTURE=${MASTER_MIN_CPU_ARCHITECTURE:-} # To allow choosing better architectures.
MASTER_DISK_TYPE=pd-ssd MASTER_DISK_TYPE=pd-ssd
MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)} MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)}
MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)} MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)}

View File

@ -113,6 +113,7 @@ function create-master-instance-internal() {
--metadata-from-file "${metadata}" \ --metadata-from-file "${metadata}" \
--disk "${disk}" \ --disk "${disk}" \
--boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \
${MASTER_MIN_CPU_ARCHITECTURE:+"--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}"} \
${preemptible_master} \ ${preemptible_master} \
${network} 2>&1); then ${network} 2>&1); then
echo "${result}" >&2 echo "${result}" >&2

View File

@ -133,6 +133,7 @@ function create-master-instance-internal() {
--metadata-from-file "${metadata}" \ --metadata-from-file "${metadata}" \
--disk "${disk}" \ --disk "${disk}" \
--boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \
${MASTER_MIN_CPU_ARCHITECTURE:+"--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}"} \
${preemptible_master} \ ${preemptible_master} \
${network} 2>&1); then ${network} 2>&1); then
echo "${result}" >&2 echo "${result}" >&2

View File

@ -737,12 +737,12 @@ function kube::util::ensure-cfssl {
kernel=$(uname -s) kernel=$(uname -s)
case "${kernel}" in case "${kernel}" in
Linux) Linux)
curl --retry 10 -s -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 curl --retry 10 -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_linux-amd64
curl --retry 10 -s -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64 curl --retry 10 -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64
;; ;;
Darwin) Darwin)
curl --retry 10 -s -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_darwin-amd64 curl --retry 10 -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_darwin-amd64
curl --retry 10 -s -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_darwin-amd64 curl --retry 10 -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_darwin-amd64
;; ;;
*) *)
echo "Unknown, unsupported platform: ${kernel}." >&2 echo "Unknown, unsupported platform: ${kernel}." >&2

View File

@ -2021,26 +2021,24 @@ func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, nodeName types.NodeName,
// DetachDisk implements Volumes.DetachDisk // DetachDisk implements Volumes.DetachDisk
func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) (string, error) { func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) (string, error) {
disk, err := newAWSDisk(c, diskName) diskInfo, attached, err := c.checkIfAttachedToNode(diskName, nodeName)
if err != nil {
if diskInfo == nil {
return "", err return "", err
} }
awsInstance, info, err := c.getFullInstance(nodeName) if !attached && diskInfo.ec2Instance != nil {
if err != nil { glog.Warningf("DetachDisk %s called for node %s but volume is attached to node %s", diskName, nodeName, diskInfo.nodeName)
if err == cloudprovider.InstanceNotFound { return "", nil
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DetachDisk will assume disk %q is not attached to it.",
nodeName,
diskName)
return "", nil
}
return "", err
} }
mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, info, disk.awsID, false) if !attached {
return "", nil
}
awsInstance := newAWSInstance(c.ec2, diskInfo.ec2Instance)
mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, diskInfo.ec2Instance, diskInfo.disk.awsID, false)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -2052,18 +2050,19 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
request := ec2.DetachVolumeInput{ request := ec2.DetachVolumeInput{
InstanceId: &awsInstance.awsID, InstanceId: &awsInstance.awsID,
VolumeId: disk.awsID.awsString(), VolumeId: diskInfo.disk.awsID.awsString(),
} }
response, err := c.ec2.DetachVolume(&request) response, err := c.ec2.DetachVolume(&request)
if err != nil { if err != nil {
return "", fmt.Errorf("error detaching EBS volume %q from %q: %q", disk.awsID, awsInstance.awsID, err) return "", fmt.Errorf("error detaching EBS volume %q from %q: %q", diskInfo.disk.awsID, awsInstance.awsID, err)
} }
if response == nil { if response == nil {
return "", errors.New("no response from DetachVolume") return "", errors.New("no response from DetachVolume")
} }
attachment, err := disk.waitForAttachmentStatus("detached") attachment, err := diskInfo.disk.waitForAttachmentStatus("detached")
if err != nil { if err != nil {
return "", err return "", err
} }
@ -2076,7 +2075,7 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName)
} }
if mountDevice != "" { if mountDevice != "" {
c.endAttaching(awsInstance, disk.awsID, mountDevice) c.endAttaching(awsInstance, diskInfo.disk.awsID, mountDevice)
// We don't check the return value - we don't really expect the attachment to have been // We don't check the return value - we don't really expect the attachment to have been
// in progress, though it might have been // in progress, though it might have been
} }
@ -2320,32 +2319,13 @@ func (c *Cloud) GetDiskPath(volumeName KubernetesVolumeID) (string, error) {
// DiskIsAttached implements Volumes.DiskIsAttached // DiskIsAttached implements Volumes.DiskIsAttached
func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeName) (bool, error) { func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeName) (bool, error) {
_, instance, err := c.getFullInstance(nodeName) diskInfo, attached, err := c.checkIfAttachedToNode(diskName, nodeName)
if err != nil {
if err == cloudprovider.InstanceNotFound {
// If instance no longer exists, safe to assume volume is not attached.
glog.Warningf(
"Instance %q does not exist. DiskIsAttached will assume disk %q is not attached to it.",
nodeName,
diskName)
return false, nil
}
return false, err if diskInfo == nil {
return true, err
} }
diskID, err := diskName.mapToAWSVolumeID() return attached, nil
if err != nil {
return false, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err)
}
for _, blockDevice := range instance.BlockDeviceMappings {
id := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId))
if id == diskID {
return true, nil
}
}
return false, nil
} }
func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) { func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) {

View File

@ -23,6 +23,9 @@ import (
"strings" "strings"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
) )
// awsVolumeRegMatch represents Regex Match for AWS volume. // awsVolumeRegMatch represents Regex Match for AWS volume.
@ -46,6 +49,16 @@ func (i awsVolumeID) awsString() *string {
// * <awsVolumeId> // * <awsVolumeId>
type KubernetesVolumeID string type KubernetesVolumeID string
// DiskInfo returns aws disk information in easy to use manner
type diskInfo struct {
ec2Instance *ec2.Instance
nodeName types.NodeName
volumeState string
attachmentState string
hasAttachment bool
disk *awsDisk
}
// mapToAWSVolumeID extracts the awsVolumeID from the KubernetesVolumeID // mapToAWSVolumeID extracts the awsVolumeID from the KubernetesVolumeID
func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) { func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) {
// name looks like aws://availability-zone/awsVolumeId // name looks like aws://availability-zone/awsVolumeId
@ -85,3 +98,55 @@ func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) {
return awsVolumeID(awsID), nil return awsVolumeID(awsID), nil
} }
func GetAWSVolumeID(kubeVolumeID string) (string, error) {
kid := KubernetesVolumeID(kubeVolumeID)
awsID, err := kid.mapToAWSVolumeID()
return string(awsID), err
}
func (c *Cloud) checkIfAttachedToNode(diskName KubernetesVolumeID, nodeName types.NodeName) (*diskInfo, bool, error) {
disk, err := newAWSDisk(c, diskName)
if err != nil {
return nil, true, err
}
awsDiskInfo := &diskInfo{
disk: disk,
}
info, err := disk.describeVolume()
if err != nil {
describeError := fmt.Errorf("Error describing volume %s with %v", diskName, err)
glog.Warning(describeError)
awsDiskInfo.volumeState = "unknown"
return awsDiskInfo, false, describeError
}
awsDiskInfo.volumeState = aws.StringValue(info.State)
if len(info.Attachments) > 0 {
attachment := info.Attachments[0]
awsDiskInfo.attachmentState = aws.StringValue(attachment.State)
instanceID := aws.StringValue(attachment.InstanceId)
instanceInfo, err := c.getInstanceByID(instanceID)
// This should never happen but if it does it could mean there was a race and instance
// has been deleted
if err != nil {
fetchErr := fmt.Errorf("Error fetching instance %s for volume %s", instanceID, diskName)
glog.Warning(fetchErr)
return awsDiskInfo, false, fetchErr
}
awsDiskInfo.ec2Instance = instanceInfo
awsDiskInfo.nodeName = mapInstanceToNodeName(instanceInfo)
awsDiskInfo.hasAttachment = true
if awsDiskInfo.nodeName == nodeName {
return awsDiskInfo, true, nil
}
}
return awsDiskInfo, false, nil
}

View File

@ -292,8 +292,14 @@ func popMember(members []v2pools.Member, addr string, port int) []v2pools.Member
return members return members
} }
func getSecurityGroupName(clusterName string, service *v1.Service) string { func getSecurityGroupName(service *v1.Service) string {
return fmt.Sprintf("lb-sg-%s-%s-%s", clusterName, service.Namespace, service.Name) securityGroupName := fmt.Sprintf("lb-sg-%s-%s-%s", service.UID, service.Namespace, service.Name)
//OpenStack requires that the name of a security group is shorter than 255 bytes.
if len(securityGroupName) > 255 {
securityGroupName = securityGroupName[:255]
}
return securityGroupName
} }
func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) { func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) {
@ -868,6 +874,14 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
return status, err return status, err
} }
// delete the old Security Group for the service
// Related to #53764
// TODO(FengyunPan): Remove it at V1.10
err = lbaas.EnsureOldSecurityGroupDeleted(clusterName, apiService)
if err != nil {
return status, fmt.Errorf("Failed to delete the Security Group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
} }
return status, nil return status, nil
@ -899,7 +913,7 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser
} }
// ensure security group for LB // ensure security group for LB
lbSecGroupName := getSecurityGroupName(clusterName, apiService) lbSecGroupName := getSecurityGroupName(apiService)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil { if err != nil {
// check whether security group does not exist // check whether security group does not exist
@ -914,8 +928,8 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser
if len(lbSecGroupID) == 0 { if len(lbSecGroupID) == 0 {
// create security group // create security group
lbSecGroupCreateOpts := groups.CreateOpts{ lbSecGroupCreateOpts := groups.CreateOpts{
Name: getSecurityGroupName(clusterName, apiService), Name: getSecurityGroupName(apiService),
Description: fmt.Sprintf("Securty Group for loadbalancer service %s/%s", apiService.Namespace, apiService.Name), Description: fmt.Sprintf("Security Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName),
} }
lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract() lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract()
@ -1174,7 +1188,7 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *v1.Service
if lbaas.opts.ManageSecurityGroups { if lbaas.opts.ManageSecurityGroups {
err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer) err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer)
if err != nil { if err != nil {
return fmt.Errorf("failed to update Securty Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
} }
} }
@ -1197,7 +1211,7 @@ func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Ser
removals := original.Difference(current) removals := original.Difference(current)
// Generate Name // Generate Name
lbSecGroupName := getSecurityGroupName(clusterName, apiService) lbSecGroupName := getSecurityGroupName(apiService)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil { if err != nil {
return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err) return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
@ -1368,50 +1382,131 @@ func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(clusterName string, service *v1.
// Delete the Security Group // Delete the Security Group
if lbaas.opts.ManageSecurityGroups { if lbaas.opts.ManageSecurityGroups {
// Generate Name err := lbaas.EnsureSecurityGroupDeleted(clusterName, service)
lbSecGroupName := getSecurityGroupName(clusterName, service)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil { if err != nil {
// check whether security group does not exist return fmt.Errorf("Failed to delete Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
_, ok := err.(*gophercloud.ErrResourceNotFound)
if ok {
// It is OK when the security group has been deleted by others.
return nil
} else {
return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
}
} }
lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID) // delete the old Security Group for the service
if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) { // Related to #53764
return lbSecGroup.Err // TODO(FengyunPan): Remove it at V1.10
err = lbaas.EnsureOldSecurityGroupDeleted(clusterName, service)
if err != nil {
return fmt.Errorf("Failed to delete the Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
} }
}
if len(lbaas.opts.NodeSecurityGroupIDs) == 0 { return nil
// Just happen when nodes have not Security Group, or should not happen }
// UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
// And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted. // EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service.
glog.Warningf("Can not find node-security-group from all the nodes of this cluser when delete loadbalancer service %s/%s", func (lbaas *LbaasV2) EnsureSecurityGroupDeleted(clusterName string, service *v1.Service) error {
service.Namespace, service.Name) // Generate Name
lbSecGroupName := getSecurityGroupName(service)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil {
// check whether security group does not exist
_, ok := err.(*gophercloud.ErrResourceNotFound)
if ok {
// It is OK when the security group has been deleted by others.
return nil
} else { } else {
// Delete the rules in the Node Security Group return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err)
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs { }
opts := rules.ListOpts{ }
SecGroupID: nodeSecurityGroupID,
RemoteGroupID: lbSecGroupID,
}
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
if err != nil && !isNotFound(err) { lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID)
msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err) if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) {
return fmt.Errorf(msg) return lbSecGroup.Err
} }
for _, rule := range secGroupRules { if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
res := rules.Delete(lbaas.network, rule.ID) // Just happen when nodes have not Security Group, or should not happen
if res.Err != nil && !isNotFound(res.Err) { // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err) // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted.
} glog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s",
service.Namespace, service.Name)
} else {
// Delete the rules in the Node Security Group
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
opts := rules.ListOpts{
SecGroupID: nodeSecurityGroupID,
RemoteGroupID: lbSecGroupID,
}
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
if err != nil && !isNotFound(err) {
msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
return fmt.Errorf(msg)
}
for _, rule := range secGroupRules {
res := rules.Delete(lbaas.network, rule.ID)
if res.Err != nil && !isNotFound(res.Err) {
return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
}
}
}
}
return nil
}
// getOldSecurityGroupName is used to get the old security group name
// Related to #53764
// TODO(FengyunPan): Remove it at V1.10
func getOldSecurityGroupName(clusterName string, service *v1.Service) string {
return fmt.Sprintf("lb-sg-%s-%v", clusterName, service.Name)
}
// EnsureOldSecurityGroupDeleted deleting old security group for specific loadbalancer service.
// Related to #53764
// TODO(FengyunPan): Remove it at V1.10
func (lbaas *LbaasV2) EnsureOldSecurityGroupDeleted(clusterName string, service *v1.Service) error {
glog.V(4).Infof("EnsureOldSecurityGroupDeleted(%v, %v)", clusterName, service)
// Generate Name
lbSecGroupName := getOldSecurityGroupName(clusterName, service)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil {
// check whether security group does not exist
_, ok := err.(*gophercloud.ErrResourceNotFound)
if ok {
// It is OK when the security group has been deleted by others.
return nil
} else {
return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err)
}
}
lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID)
if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) {
return lbSecGroup.Err
}
if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
// Just happen when nodes have not Security Group, or should not happen
// UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
// And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted.
glog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s",
service.Namespace, service.Name)
} else {
// Delete the rules in the Node Security Group
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
opts := rules.ListOpts{
SecGroupID: nodeSecurityGroupID,
RemoteGroupID: lbSecGroupID,
}
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
if err != nil && !isNotFound(err) {
msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
return fmt.Errorf(msg)
}
for _, rule := range secGroupRules {
res := rules.Delete(lbaas.network, rule.ID)
if res.Err != nil && !isNotFound(res.Err) {
return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
} }
} }
} }

View File

@ -50,8 +50,6 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library",
], ],
) )

View File

@ -89,6 +89,7 @@ type serviceCache struct {
type ServiceController struct { type ServiceController struct {
cloud cloudprovider.Interface cloud cloudprovider.Interface
knownHosts []*v1.Node knownHosts []*v1.Node
servicesToUpdate []*v1.Service
kubeClient clientset.Interface kubeClient clientset.Interface
clusterName string clusterName string
balancer cloudprovider.LoadBalancer balancer cloudprovider.LoadBalancer
@ -243,20 +244,6 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
} }
} }
} }
if cachedService.state != nil {
if !s.needsUpdate(cachedService.state, service) {
// The service does not require an update which means it was placed on the work queue
// by the node sync loop and indicates that the hosts need to be updated.
err := s.updateLoadBalancerHosts(service)
if err != nil {
return err, cachedService.nextRetryDelay()
}
cachedService.resetRetryDelay()
return nil, doNotRetry
}
}
// cache the service, we need the info for service deletion // cache the service, we need the info for service deletion
cachedService.state = service cachedService.state = service
err, retry := s.createLoadBalancerIfNeeded(key, service) err, retry := s.createLoadBalancerIfNeeded(key, service)
@ -451,8 +438,6 @@ func (s *serviceCache) delete(serviceName string) {
delete(s.serviceMap, serviceName) delete(s.serviceMap, serviceName)
} }
// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update.
// This method does not and should not check if the hosts have changed.
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) { if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
return false return false
@ -651,45 +636,62 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
} }
} }
// nodeSyncLoop handles adding all existing cached services to the work queue // nodeSyncLoop handles updating the hosts pointed to by all load
// to be reprocessed so that they can have their hosts updated, if any // balancers whenever the set of nodes in the cluster changes.
// host changes have occurred since the last sync loop.
func (s *ServiceController) nodeSyncLoop() { func (s *ServiceController) nodeSyncLoop() {
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil { if err != nil {
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
return return
} }
if nodeSlicesEqualForLB(newHosts, s.knownHosts) { if nodeSlicesEqualForLB(newHosts, s.knownHosts) {
// Nothing to do since the hosts have not changed. // The set of nodes in the cluster hasn't changed, but we can retry
// updating any services that we failed to update last time around.
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
return return
} }
glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts)) glog.Infof("Detected change in list of current cluster nodes. New node set: %v",
nodeNames(newHosts))
for _, svc := range s.cache.allServices() { // Try updating all services, and save the ones that fail to try again next
s.enqueueService(svc) // round.
} s.servicesToUpdate = s.cache.allServices()
numServices := len(s.servicesToUpdate)
s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts)
glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes",
numServices-len(s.servicesToUpdate), numServices)
// Update the known hosts so we can check next sync loop for changes.
s.knownHosts = newHosts s.knownHosts = newHosts
} }
// Updates the load balancer of the service with updated nodes ONLY. // updateLoadBalancerHosts updates all existing load balancers so that
// This method will not trigger the cloud provider to create or full update a load balancer. // they will match the list of hosts provided.
func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error { // Returns the list of services that couldn't be updated.
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) {
for _, service := range services {
func() {
if service == nil {
return
}
if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil {
glog.Errorf("External error while updating load balancer: %v.", err)
servicesToRetry = append(servicesToRetry, service)
}
}()
}
return servicesToRetry
}
// Updates the load balancer of a service, assuming we hold the mutex
// associated with the service.
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error {
if !wantsLoadBalancer(service) { if !wantsLoadBalancer(service) {
return nil return nil
} }
hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil {
return err
}
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event // This operation doesn't normally take very long (and happens pretty often), so we only record the final event
err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
if err == nil { if err == nil {
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(hosts) == 0 { if len(hosts) == 0 {

View File

@ -19,7 +19,6 @@ package service
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"sort"
"testing" "testing"
"time" "time"
@ -28,8 +27,6 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
@ -177,45 +174,23 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
} }
} }
// newLoadBalancerNode returns a node that passes the predicate check for a
// node to receive load balancer traffic.
func newLoadBalancerNode(name string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.NodeSpec{
Unschedulable: false,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionTrue},
},
},
}
}
func sortNodesByName(nodes []*v1.Node) {
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Name < nodes[j].Name
})
}
// TODO: Finish converting and update comments // TODO: Finish converting and update comments
func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
nodes := []*v1.Node{ nodes := []*v1.Node{
newLoadBalancerNode("node0"), {ObjectMeta: metav1.ObjectMeta{Name: "node0"}},
newLoadBalancerNode("node1"), {ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
newLoadBalancerNode("node73"), {ObjectMeta: metav1.ObjectMeta{Name: "node73"}},
} }
sortNodesByName(nodes) table := []struct {
table := map[string]struct {
services []*v1.Service services []*v1.Service
expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall
}{ }{
"update no load balancer": { {
// No services present: no calls should be made.
services: []*v1.Service{},
expectedUpdateCalls: nil,
},
{
// Services do not have external load balancers: no calls should be made. // Services do not have external load balancers: no calls should be made.
services: []*v1.Service{ services: []*v1.Service{
newService("s0", "111", v1.ServiceTypeClusterIP), newService("s0", "111", v1.ServiceTypeClusterIP),
@ -223,7 +198,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
}, },
expectedUpdateCalls: nil, expectedUpdateCalls: nil,
}, },
"update 1 load balancer": { {
// Services does have an external load balancer: one call should be made. // Services does have an external load balancer: one call should be made.
services: []*v1.Service{ services: []*v1.Service{
newService("s0", "333", v1.ServiceTypeLoadBalancer), newService("s0", "333", v1.ServiceTypeLoadBalancer),
@ -232,7 +207,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes}, {Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes},
}, },
}, },
"update 3 load balancers": { {
// Three services have an external load balancer: three calls. // Three services have an external load balancer: three calls.
services: []*v1.Service{ services: []*v1.Service{
newService("s0", "444", v1.ServiceTypeLoadBalancer), newService("s0", "444", v1.ServiceTypeLoadBalancer),
@ -245,7 +220,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes}, {Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes},
}, },
}, },
"update 2 load balancers": { {
// Two services have an external load balancer and two don't: two calls. // Two services have an external load balancer and two don't: two calls.
services: []*v1.Service{ services: []*v1.Service{
newService("s0", "777", v1.ServiceTypeNodePort), newService("s0", "777", v1.ServiceTypeNodePort),
@ -258,44 +233,30 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
{Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes},
}, },
}, },
{
// One service has an external load balancer and one is nil: one call.
services: []*v1.Service{
newService("s0", "234", v1.ServiceTypeLoadBalancer),
nil,
},
expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{
{Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes},
},
},
} }
for _, item := range table {
controller, cloud, _ := newController()
for name, item := range table { var services []*v1.Service
t.Run(name, func(t *testing.T) { for _, service := range item.services {
controller, cloud, _ := newController() services = append(services, service)
}
var services []*v1.Service if err := controller.updateLoadBalancerHosts(services, nodes); err != nil {
for _, service := range item.services { t.Errorf("unexpected error: %v", err)
services = append(services, service) }
} if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) {
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls)
for _, node := range nodes { }
nodeIndexer.Add(node)
}
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer)
for _, service := range services {
if err := controller.updateLoadBalancerHosts(service); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
if len(item.expectedUpdateCalls) != len(cloud.UpdateCalls) {
t.Errorf("expected %d update calls but only got %d", len(item.expectedUpdateCalls), len(cloud.UpdateCalls))
}
for i, expectedCall := range item.expectedUpdateCalls {
actualCall := cloud.UpdateCalls[i]
if !reflect.DeepEqual(expectedCall.Service, actualCall.Service) {
t.Errorf("expected update call to contain service %+v, got %+v", expectedCall.Service, actualCall.Service)
}
sortNodesByName(actualCall.Hosts)
if !reflect.DeepEqual(expectedCall.Hosts, actualCall.Hosts) {
t.Errorf("expected update call to contain hosts %+v, got %+v", expectedCall.Hosts, actualCall.Hosts)
}
}
})
} }
} }
@ -350,13 +311,6 @@ func TestProcessServiceUpdate(t *testing.T) {
var controller *ServiceController var controller *ServiceController
var cloud *fakecloud.FakeCloud var cloud *fakecloud.FakeCloud
nodes := []*v1.Node{
newLoadBalancerNode("node0"),
newLoadBalancerNode("node1"),
newLoadBalancerNode("node73"),
}
sortNodesByName(nodes)
//A pair of old and new loadbalancer IP address //A pair of old and new loadbalancer IP address
oldLBIP := "192.168.1.1" oldLBIP := "192.168.1.1"
newLBIP := "192.168.1.11" newLBIP := "192.168.1.11"
@ -390,51 +344,6 @@ func TestProcessServiceUpdate(t *testing.T) {
return nil return nil
}, },
}, },
{
testName: "If updating hosts only",
key: "default/sync-test-name",
svc: newService("sync-test-name", types.UID("sync-test-uid"), v1.ServiceTypeLoadBalancer),
updateFn: func(svc *v1.Service) *v1.Service {
keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName()
cachedServiceTest := controller.cache.getOrCreate(keyExpected)
cachedServiceTest.state = svc
controller.cache.set(keyExpected, cachedServiceTest)
// Set the nodes for the cloud's UpdateLoadBalancer call to use.
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for _, node := range nodes {
nodeIndexer.Add(node)
}
controller.nodeLister = corelisters.NewNodeLister(nodeIndexer)
// This should trigger the needsUpdate false check since the service equals the cached service
return svc
},
expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error {
if err != nil {
return err
}
if retryDuration != doNotRetry {
return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration)
}
if len(cloud.UpdateCalls) != 1 {
return fmt.Errorf("expected one update host call but only got %+v", cloud.UpdateCalls)
}
actualCall := cloud.UpdateCalls[0]
if !reflect.DeepEqual(svc, actualCall.Service) {
return fmt.Errorf("expected update call to contain service %+v, got %+v", svc, actualCall.Service)
}
sortNodesByName(actualCall.Hosts)
if !reflect.DeepEqual(nodes, actualCall.Hosts) {
return fmt.Errorf("expected update call to contain hosts %+v, got %+v", nodes, actualCall.Hosts)
}
return nil
},
},
{ {
testName: "If Updating Loadbalancer IP", testName: "If Updating Loadbalancer IP",
key: "default/sync-test-name", key: "default/sync-test-name",

View File

@ -206,6 +206,12 @@ const (
// alpha: v1.9 // alpha: v1.9
// Postpone deletion of a persistent volume claim in case it is used by a pod // Postpone deletion of a persistent volume claim in case it is used by a pod
PVCProtection utilfeature.Feature = "PVCProtection" PVCProtection utilfeature.Feature = "PVCProtection"
// owner: @aveshagarwal
// alpha: v1.9
//
// Enable resource limits priority function
ResourceLimitsPriorityFunction utilfeature.Feature = "ResourceLimitsPriorityFunction"
) )
func init() { func init() {
@ -244,6 +250,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
CustomPodDNS: {Default: false, PreRelease: utilfeature.Alpha}, CustomPodDNS: {Default: false, PreRelease: utilfeature.Alpha},
BlockVolume: {Default: false, PreRelease: utilfeature.Alpha}, BlockVolume: {Default: false, PreRelease: utilfeature.Alpha},
PVCProtection: {Default: false, PreRelease: utilfeature.Alpha}, PVCProtection: {Default: false, PreRelease: utilfeature.Alpha},
ResourceLimitsPriorityFunction: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed // inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side: // unintentionally on either side:

View File

@ -410,7 +410,7 @@ func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptio
var filtered []dockertypes.Container var filtered []dockertypes.Container
for _, container := range containerList { for _, container := range containerList {
for _, statusFilter := range statusFilters { for _, statusFilter := range statusFilters {
if container.Status == statusFilter { if toDockerContainerStatus(container.Status) == statusFilter {
filtered = append(filtered, container) filtered = append(filtered, container)
break break
} }
@ -443,6 +443,19 @@ func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptio
return containerList, err return containerList, err
} }
func toDockerContainerStatus(state string) string {
switch {
case strings.HasPrefix(state, StatusCreatedPrefix):
return "created"
case strings.HasPrefix(state, StatusRunningPrefix):
return "running"
case strings.HasPrefix(state, StatusExitedPrefix):
return "exited"
default:
return "unknown"
}
}
// InspectContainer is a test-spy implementation of Interface.InspectContainer. // InspectContainer is a test-spy implementation of Interface.InspectContainer.
// It adds an entry "inspect" to the internal method call record. // It adds an entry "inspect" to the internal method call record.
func (f *FakeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) { func (f *FakeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) {
@ -565,6 +578,18 @@ func (f *FakeDockerClient) StartContainer(id string) error {
} }
f.appendContainerTrace("Started", id) f.appendContainerTrace("Started", id)
container, ok := f.ContainerMap[id] container, ok := f.ContainerMap[id]
if container.HostConfig.NetworkMode.IsContainer() {
hostContainerID := container.HostConfig.NetworkMode.ConnectedContainer()
found := false
for _, container := range f.RunningContainerList {
if container.ID == hostContainerID {
found = true
}
}
if !found {
return fmt.Errorf("failed to start container \"%s\": Error response from daemon: cannot join network of a non running container: %s", id, hostContainerID)
}
}
timestamp := f.Clock.Now() timestamp := f.Clock.Now()
if !ok { if !ok {
container = convertFakeContainer(&FakeContainer{ID: id, Name: id, CreatedAt: timestamp}) container = convertFakeContainer(&FakeContainer{ID: id, Name: id, CreatedAt: timestamp})

View File

@ -256,21 +256,7 @@ func (plugin *awsElasticBlockStorePlugin) NewDetacher() (volume.Detacher, error)
func (detacher *awsElasticBlockStoreDetacher) Detach(volumeName string, nodeName types.NodeName) error { func (detacher *awsElasticBlockStoreDetacher) Detach(volumeName string, nodeName types.NodeName) error {
volumeID := aws.KubernetesVolumeID(path.Base(volumeName)) volumeID := aws.KubernetesVolumeID(path.Base(volumeName))
attached, err := detacher.awsVolumes.DiskIsAttached(volumeID, nodeName) if _, err := detacher.awsVolumes.DetachDisk(volumeID, nodeName); err != nil {
if err != nil {
// Log error and continue with detach
glog.Errorf(
"Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v",
volumeID, nodeName, err)
}
if err == nil && !attached {
// Volume is already detached from node.
glog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, nodeName)
return nil
}
if _, err = detacher.awsVolumes.DetachDisk(volumeID, nodeName); err != nil {
glog.Errorf("Error detaching volumeID %q: %v", volumeID, err) glog.Errorf("Error detaching volumeID %q: %v", volumeID, err)
return err return err
} }

View File

@ -62,10 +62,9 @@ func TestGetVolumeName_PersistentVolume(t *testing.T) {
type testcase struct { type testcase struct {
name aws.KubernetesVolumeID name aws.KubernetesVolumeID
// For fake AWS: // For fake AWS:
attach attachCall attach attachCall
detach detachCall detach detachCall
diskIsAttached diskIsAttachedCall t *testing.T
t *testing.T
// Actual test to run // Actual test to run
test func(test *testcase) (string, error) test func(test *testcase) (string, error)
@ -81,7 +80,6 @@ func TestAttachDetach(t *testing.T) {
spec := createVolSpec(diskName, readOnly) spec := createVolSpec(diskName, readOnly)
attachError := errors.New("Fake attach error") attachError := errors.New("Fake attach error")
detachError := errors.New("Fake detach error") detachError := errors.New("Fake detach error")
diskCheckError := errors.New("Fake DiskIsAttached error")
tests := []testcase{ tests := []testcase{
// Successful Attach call // Successful Attach call
{ {
@ -107,44 +105,18 @@ func TestAttachDetach(t *testing.T) {
// Detach succeeds // Detach succeeds
{ {
name: "Detach_Positive", name: "Detach_Positive",
diskIsAttached: diskIsAttachedCall{diskName, nodeName, true, nil}, detach: detachCall{diskName, nodeName, "/dev/sda", nil},
detach: detachCall{diskName, nodeName, "/dev/sda", nil},
test: func(testcase *testcase) (string, error) { test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase) detacher := newDetacher(testcase)
mountPath := "/mnt/" + string(diskName) mountPath := "/mnt/" + string(diskName)
return "", detacher.Detach(mountPath, nodeName) return "", detacher.Detach(mountPath, nodeName)
}, },
}, },
// Disk is already detached
{
name: "Detach_Positive_AlreadyDetached",
diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
mountPath := "/mnt/" + string(diskName)
return "", detacher.Detach(mountPath, nodeName)
},
},
// Detach succeeds when DiskIsAttached fails
{
name: "Detach_Positive_CheckFails",
diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, diskCheckError},
detach: detachCall{diskName, nodeName, "/dev/sda", nil},
test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase)
mountPath := "/mnt/" + string(diskName)
return "", detacher.Detach(mountPath, nodeName)
},
},
// Detach fails // Detach fails
{ {
name: "Detach_Negative", name: "Detach_Negative",
diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, diskCheckError}, detach: detachCall{diskName, nodeName, "", detachError},
detach: detachCall{diskName, nodeName, "", detachError},
test: func(testcase *testcase) (string, error) { test: func(testcase *testcase) (string, error) {
detacher := newDetacher(testcase) detacher := newDetacher(testcase)
mountPath := "/mnt/" + string(diskName) mountPath := "/mnt/" + string(diskName)
@ -298,28 +270,8 @@ func (testcase *testcase) DetachDisk(diskName aws.KubernetesVolumeID, nodeName t
} }
func (testcase *testcase) DiskIsAttached(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) { func (testcase *testcase) DiskIsAttached(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) {
expected := &testcase.diskIsAttached // DetachDisk no longer relies on DiskIsAttached api call
return false, nil
if expected.diskName == "" && expected.nodeName == "" {
// testcase.diskIsAttached looks uninitialized, test did not expect to
// call DiskIsAttached
testcase.t.Errorf("Unexpected DiskIsAttached call!")
return false, errors.New("Unexpected DiskIsAttached call!")
}
if expected.diskName != diskName {
testcase.t.Errorf("Unexpected DiskIsAttached call: expected diskName %s, got %s", expected.diskName, diskName)
return false, errors.New("Unexpected DiskIsAttached call: wrong diskName")
}
if expected.nodeName != nodeName {
testcase.t.Errorf("Unexpected DiskIsAttached call: expected nodeName %s, got %s", expected.nodeName, nodeName)
return false, errors.New("Unexpected DiskIsAttached call: wrong nodeName")
}
glog.V(4).Infof("DiskIsAttached call: %s, %s, returning %v, %v", diskName, nodeName, expected.isAttached, expected.ret)
return expected.isAttached, expected.ret
} }
func (testcase *testcase) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { func (testcase *testcase) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) {

View File

@ -19,6 +19,7 @@ go_library(
"node_label.go", "node_label.go",
"node_prefer_avoid_pods.go", "node_prefer_avoid_pods.go",
"reduce.go", "reduce.go",
"resource_limits.go",
"selector_spreading.go", "selector_spreading.go",
"taint_toleration.go", "taint_toleration.go",
"test_util.go", "test_util.go",
@ -54,6 +55,7 @@ go_test(
"node_affinity_test.go", "node_affinity_test.go",
"node_label_test.go", "node_label_test.go",
"node_prefer_avoid_pods_test.go", "node_prefer_avoid_pods_test.go",
"resource_limits_test.go",
"selector_spreading_test.go", "selector_spreading_test.go",
"taint_toleration_test.go", "taint_toleration_test.go",
], ],

View File

@ -0,0 +1,128 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"fmt"
"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
// ResourceLimitsPriorityMap is a priority function that increases score of input node by 1 if the node satisfies
// input pod's resource limits. In detail, this priority function works as follows: If a node does not publish its
// allocatable resources (cpu and memory both), the node score is not affected. If a pod does not specify
// its cpu and memory limits both, the node score is not affected. If one or both of cpu and memory limits
// of the pod are satisfied, the node is assigned a score of 1.
// Rationale of choosing the lowest score of 1 is that this is mainly selected to break ties between nodes that have
// same scores assigned by one of least and most requested priority functions.
func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) {
node := nodeInfo.Node()
if node == nil {
return schedulerapi.HostPriority{}, fmt.Errorf("node not found")
}
allocatableResources := nodeInfo.AllocatableResource()
// compute pod limits
podLimits := getResourceLimits(pod)
cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU)
memScore := computeScore(podLimits.Memory, allocatableResources.Memory)
score := int(0)
if cpuScore == 1 || memScore == 1 {
score = 1
}
if glog.V(10) {
// We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
glog.Infof(
"%v -> %v: Resource Limits Priority, allocatable %d millicores %d memory bytes, pod limits %d millicores %d memory bytes, score %d",
pod.Name, node.Name,
allocatableResources.MilliCPU, allocatableResources.Memory,
podLimits.MilliCPU, podLimits.Memory,
score,
)
}
return schedulerapi.HostPriority{
Host: node.Name,
Score: score,
}, nil
}
// computeScore return 1 if limit value is less than or equal to allocable
// value, otherwise it returns 0.
func computeScore(limit, allocatable int64) int64 {
if limit != 0 && allocatable != 0 && limit <= allocatable {
return 1
}
return 0
}
// getResourceLimits computes resource limits for input pod.
// The reason to create this new function is to be consistent with other
// priority functions because most or perhaps all priority functions work
// with schedulercache.Resource.
// TODO: cache it as part of metadata passed to priority functions.
func getResourceLimits(pod *v1.Pod) *schedulercache.Resource {
result := &schedulercache.Resource{}
for _, container := range pod.Spec.Containers {
result.Add(container.Resources.Limits)
}
// take max_resource(sum_pod, any_init_container)
for _, container := range pod.Spec.InitContainers {
for rName, rQuantity := range container.Resources.Limits {
switch rName {
case v1.ResourceMemory:
if mem := rQuantity.Value(); mem > result.Memory {
result.Memory = mem
}
case v1.ResourceCPU:
if cpu := rQuantity.MilliValue(); cpu > result.MilliCPU {
result.MilliCPU = cpu
}
// keeping these resources though score computation in other priority functions and in this
// are only computed based on cpu and memory only.
case v1.ResourceEphemeralStorage:
if ephemeralStorage := rQuantity.Value(); ephemeralStorage > result.EphemeralStorage {
result.EphemeralStorage = ephemeralStorage
}
case v1.ResourceNvidiaGPU:
if gpu := rQuantity.Value(); gpu > result.NvidiaGPU {
result.NvidiaGPU = gpu
}
default:
if v1helper.IsScalarResourceName(rName) {
value := rQuantity.Value()
if value > result.ScalarResources[rName] {
result.SetScalar(rName, value)
}
}
}
}
}
return result
}

View File

@ -0,0 +1,151 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package priorities
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
//metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
func TestResourceLimistPriority(t *testing.T) {
noResources := v1.PodSpec{
Containers: []v1.Container{},
}
cpuOnly := v1.PodSpec{
NodeName: "machine1",
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("0"),
},
},
},
},
}
memOnly := v1.PodSpec{
NodeName: "machine2",
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("0"),
v1.ResourceMemory: resource.MustParse("3000"),
},
},
},
},
}
cpuAndMemory := v1.PodSpec{
NodeName: "machine2",
Containers: []v1.Container{
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1000m"),
v1.ResourceMemory: resource.MustParse("2000"),
},
},
},
{
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("2000m"),
v1.ResourceMemory: resource.MustParse("3000"),
},
},
},
},
}
tests := []struct {
// input pod
pod *v1.Pod
nodes []*v1.Node
expectedList schedulerapi.HostPriorityList
test string
}{
{
pod: &v1.Pod{Spec: noResources},
nodes: []*v1.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 0), makeNode("machine3", 0, 10000), makeNode("machine4", 0, 0)},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}, {Host: "machine3", Score: 0}, {Host: "machine4", Score: 0}},
test: "pod does not specify its resource limits",
},
{
pod: &v1.Pod{Spec: cpuOnly},
nodes: []*v1.Node{makeNode("machine1", 3000, 10000), makeNode("machine2", 2000, 10000)},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 0}},
test: "pod only specifies cpu limits",
},
{
pod: &v1.Pod{Spec: memOnly},
nodes: []*v1.Node{makeNode("machine1", 4000, 4000), makeNode("machine2", 5000, 10000)},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 1}},
test: "pod only specifies mem limits",
},
{
pod: &v1.Pod{Spec: cpuAndMemory},
nodes: []*v1.Node{makeNode("machine1", 4000, 4000), makeNode("machine2", 5000, 10000)},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 1}},
test: "pod specifies both cpu and mem limits",
},
{
pod: &v1.Pod{Spec: cpuAndMemory},
nodes: []*v1.Node{makeNode("machine1", 0, 0)},
expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}},
test: "node does not advertise its allocatables",
},
}
for _, test := range tests {
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes)
list, err := priorityFunction(ResourceLimitsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !reflect.DeepEqual(test.expectedList, list) {
t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list)
}
}
}

View File

@ -106,6 +106,10 @@ func init() {
factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1) factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1)
// Optional, cluster-autoscaler friendly priority function - give used nodes higher priority. // Optional, cluster-autoscaler friendly priority function - give used nodes higher priority.
factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1) factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1)
// Prioritizes nodes that satisfy pod's resource limits
if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) {
factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1)
}
} }
func defaultPredicates() sets.String { func defaultPredicates() sets.String {