diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index b7adee87c20..eb0eb611ea3 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -30,6 +30,7 @@ REGIONAL_KUBE_ADDONS=${REGIONAL_KUBE_ADDONS:-true} NODE_SIZE=${NODE_SIZE:-n1-standard-2} NUM_NODES=${NUM_NODES:-3} MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)} +MASTER_MIN_CPU_ARCHITECTURE=${MASTER_MIN_CPU_ARCHITECTURE:-} # To allow choosing better architectures. MASTER_DISK_TYPE=pd-ssd MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)} MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)} diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index f9f223e1e9c..12a45ddad8d 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -30,6 +30,7 @@ REGIONAL_KUBE_ADDONS=${REGIONAL_KUBE_ADDONS:-true} NODE_SIZE=${NODE_SIZE:-n1-standard-2} NUM_NODES=${NUM_NODES:-3} MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)} +MASTER_MIN_CPU_ARCHITECTURE=${MASTER_MIN_CPU_ARCHITECTURE:-} # To allow choosing better architectures. MASTER_DISK_TYPE=pd-ssd MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)} MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)} diff --git a/cluster/gce/container-linux/master-helper.sh b/cluster/gce/container-linux/master-helper.sh index f8177fd308f..84e86a03502 100755 --- a/cluster/gce/container-linux/master-helper.sh +++ b/cluster/gce/container-linux/master-helper.sh @@ -113,6 +113,7 @@ function create-master-instance-internal() { --metadata-from-file "${metadata}" \ --disk "${disk}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ + ${MASTER_MIN_CPU_ARCHITECTURE:+"--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}"} \ ${preemptible_master} \ ${network} 2>&1); then echo "${result}" >&2 diff --git a/cluster/gce/gci/master-helper.sh b/cluster/gce/gci/master-helper.sh index b93bd86ab8c..0efab2bf8d1 100755 --- a/cluster/gce/gci/master-helper.sh +++ b/cluster/gce/gci/master-helper.sh @@ -133,6 +133,7 @@ function create-master-instance-internal() { --metadata-from-file "${metadata}" \ --disk "${disk}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ + ${MASTER_MIN_CPU_ARCHITECTURE:+"--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}"} \ ${preemptible_master} \ ${network} 2>&1); then echo "${result}" >&2 diff --git a/hack/lib/util.sh b/hack/lib/util.sh index 69410a7fd64..1114cca8902 100755 --- a/hack/lib/util.sh +++ b/hack/lib/util.sh @@ -737,12 +737,12 @@ function kube::util::ensure-cfssl { kernel=$(uname -s) case "${kernel}" in Linux) - curl --retry 10 -s -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 - curl --retry 10 -s -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64 + curl --retry 10 -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 + curl --retry 10 -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64 ;; Darwin) - curl --retry 10 -s -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_darwin-amd64 - curl --retry 10 -s -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_darwin-amd64 + curl --retry 10 -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_darwin-amd64 + curl --retry 10 -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_darwin-amd64 ;; *) echo "Unknown, unsupported platform: ${kernel}." >&2 diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index c25b0f58ea2..5f8d0ab36cf 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -2021,26 +2021,24 @@ func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, nodeName types.NodeName, // DetachDisk implements Volumes.DetachDisk func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) (string, error) { - disk, err := newAWSDisk(c, diskName) - if err != nil { + diskInfo, attached, err := c.checkIfAttachedToNode(diskName, nodeName) + + if diskInfo == nil { return "", err } - awsInstance, info, err := c.getFullInstance(nodeName) - if err != nil { - if err == cloudprovider.InstanceNotFound { - // If instance no longer exists, safe to assume volume is not attached. - glog.Warningf( - "Instance %q does not exist. DetachDisk will assume disk %q is not attached to it.", - nodeName, - diskName) - return "", nil - } - - return "", err + if !attached && diskInfo.ec2Instance != nil { + glog.Warningf("DetachDisk %s called for node %s but volume is attached to node %s", diskName, nodeName, diskInfo.nodeName) + return "", nil } - mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, info, disk.awsID, false) + if !attached { + return "", nil + } + + awsInstance := newAWSInstance(c.ec2, diskInfo.ec2Instance) + + mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, diskInfo.ec2Instance, diskInfo.disk.awsID, false) if err != nil { return "", err } @@ -2052,18 +2050,19 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) request := ec2.DetachVolumeInput{ InstanceId: &awsInstance.awsID, - VolumeId: disk.awsID.awsString(), + VolumeId: diskInfo.disk.awsID.awsString(), } response, err := c.ec2.DetachVolume(&request) if err != nil { - return "", fmt.Errorf("error detaching EBS volume %q from %q: %q", disk.awsID, awsInstance.awsID, err) + return "", fmt.Errorf("error detaching EBS volume %q from %q: %q", diskInfo.disk.awsID, awsInstance.awsID, err) } + if response == nil { return "", errors.New("no response from DetachVolume") } - attachment, err := disk.waitForAttachmentStatus("detached") + attachment, err := diskInfo.disk.waitForAttachmentStatus("detached") if err != nil { return "", err } @@ -2076,7 +2075,7 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) } if mountDevice != "" { - c.endAttaching(awsInstance, disk.awsID, mountDevice) + c.endAttaching(awsInstance, diskInfo.disk.awsID, mountDevice) // We don't check the return value - we don't really expect the attachment to have been // in progress, though it might have been } @@ -2320,32 +2319,13 @@ func (c *Cloud) GetDiskPath(volumeName KubernetesVolumeID) (string, error) { // DiskIsAttached implements Volumes.DiskIsAttached func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeName) (bool, error) { - _, instance, err := c.getFullInstance(nodeName) - if err != nil { - if err == cloudprovider.InstanceNotFound { - // If instance no longer exists, safe to assume volume is not attached. - glog.Warningf( - "Instance %q does not exist. DiskIsAttached will assume disk %q is not attached to it.", - nodeName, - diskName) - return false, nil - } + diskInfo, attached, err := c.checkIfAttachedToNode(diskName, nodeName) - return false, err + if diskInfo == nil { + return true, err } - diskID, err := diskName.mapToAWSVolumeID() - if err != nil { - return false, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err) - } - - for _, blockDevice := range instance.BlockDeviceMappings { - id := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId)) - if id == diskID { - return true, nil - } - } - return false, nil + return attached, nil } func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) { diff --git a/pkg/cloudprovider/providers/aws/volumes.go b/pkg/cloudprovider/providers/aws/volumes.go index c67f080fec1..3a4aa6284eb 100644 --- a/pkg/cloudprovider/providers/aws/volumes.go +++ b/pkg/cloudprovider/providers/aws/volumes.go @@ -23,6 +23,9 @@ import ( "strings" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" ) // awsVolumeRegMatch represents Regex Match for AWS volume. @@ -46,6 +49,16 @@ func (i awsVolumeID) awsString() *string { // * type KubernetesVolumeID string +// DiskInfo returns aws disk information in easy to use manner +type diskInfo struct { + ec2Instance *ec2.Instance + nodeName types.NodeName + volumeState string + attachmentState string + hasAttachment bool + disk *awsDisk +} + // mapToAWSVolumeID extracts the awsVolumeID from the KubernetesVolumeID func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) { // name looks like aws://availability-zone/awsVolumeId @@ -85,3 +98,55 @@ func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) { return awsVolumeID(awsID), nil } + +func GetAWSVolumeID(kubeVolumeID string) (string, error) { + kid := KubernetesVolumeID(kubeVolumeID) + awsID, err := kid.mapToAWSVolumeID() + return string(awsID), err +} + +func (c *Cloud) checkIfAttachedToNode(diskName KubernetesVolumeID, nodeName types.NodeName) (*diskInfo, bool, error) { + disk, err := newAWSDisk(c, diskName) + + if err != nil { + return nil, true, err + } + + awsDiskInfo := &diskInfo{ + disk: disk, + } + + info, err := disk.describeVolume() + + if err != nil { + describeError := fmt.Errorf("Error describing volume %s with %v", diskName, err) + glog.Warning(describeError) + awsDiskInfo.volumeState = "unknown" + return awsDiskInfo, false, describeError + } + + awsDiskInfo.volumeState = aws.StringValue(info.State) + + if len(info.Attachments) > 0 { + attachment := info.Attachments[0] + awsDiskInfo.attachmentState = aws.StringValue(attachment.State) + instanceID := aws.StringValue(attachment.InstanceId) + instanceInfo, err := c.getInstanceByID(instanceID) + + // This should never happen but if it does it could mean there was a race and instance + // has been deleted + if err != nil { + fetchErr := fmt.Errorf("Error fetching instance %s for volume %s", instanceID, diskName) + glog.Warning(fetchErr) + return awsDiskInfo, false, fetchErr + } + + awsDiskInfo.ec2Instance = instanceInfo + awsDiskInfo.nodeName = mapInstanceToNodeName(instanceInfo) + awsDiskInfo.hasAttachment = true + if awsDiskInfo.nodeName == nodeName { + return awsDiskInfo, true, nil + } + } + return awsDiskInfo, false, nil +} diff --git a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go index 98b5b44ed77..471fdc31a61 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go +++ b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go @@ -292,8 +292,14 @@ func popMember(members []v2pools.Member, addr string, port int) []v2pools.Member return members } -func getSecurityGroupName(clusterName string, service *v1.Service) string { - return fmt.Sprintf("lb-sg-%s-%s-%s", clusterName, service.Namespace, service.Name) +func getSecurityGroupName(service *v1.Service) string { + securityGroupName := fmt.Sprintf("lb-sg-%s-%s-%s", service.UID, service.Namespace, service.Name) + //OpenStack requires that the name of a security group is shorter than 255 bytes. + if len(securityGroupName) > 255 { + securityGroupName = securityGroupName[:255] + } + + return securityGroupName } func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) { @@ -868,6 +874,14 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) return status, err } + + // delete the old Security Group for the service + // Related to #53764 + // TODO(FengyunPan): Remove it at V1.10 + err = lbaas.EnsureOldSecurityGroupDeleted(clusterName, apiService) + if err != nil { + return status, fmt.Errorf("Failed to delete the Security Group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err) + } } return status, nil @@ -899,7 +913,7 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser } // ensure security group for LB - lbSecGroupName := getSecurityGroupName(clusterName, apiService) + lbSecGroupName := getSecurityGroupName(apiService) lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) if err != nil { // check whether security group does not exist @@ -914,8 +928,8 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser if len(lbSecGroupID) == 0 { // create security group lbSecGroupCreateOpts := groups.CreateOpts{ - Name: getSecurityGroupName(clusterName, apiService), - Description: fmt.Sprintf("Securty Group for loadbalancer service %s/%s", apiService.Namespace, apiService.Name), + Name: getSecurityGroupName(apiService), + Description: fmt.Sprintf("Security Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName), } lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract() @@ -1174,7 +1188,7 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *v1.Service if lbaas.opts.ManageSecurityGroups { err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer) if err != nil { - return fmt.Errorf("failed to update Securty Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) + return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) } } @@ -1197,7 +1211,7 @@ func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Ser removals := original.Difference(current) // Generate Name - lbSecGroupName := getSecurityGroupName(clusterName, apiService) + lbSecGroupName := getSecurityGroupName(apiService) lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) if err != nil { return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err) @@ -1368,50 +1382,131 @@ func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(clusterName string, service *v1. // Delete the Security Group if lbaas.opts.ManageSecurityGroups { - // Generate Name - lbSecGroupName := getSecurityGroupName(clusterName, service) - lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) + err := lbaas.EnsureSecurityGroupDeleted(clusterName, service) if err != nil { - // check whether security group does not exist - _, ok := err.(*gophercloud.ErrResourceNotFound) - if ok { - // It is OK when the security group has been deleted by others. - return nil - } else { - return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err) - } + return fmt.Errorf("Failed to delete Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) } - lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID) - if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) { - return lbSecGroup.Err + // delete the old Security Group for the service + // Related to #53764 + // TODO(FengyunPan): Remove it at V1.10 + err = lbaas.EnsureOldSecurityGroupDeleted(clusterName, service) + if err != nil { + return fmt.Errorf("Failed to delete the Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) } + } - if len(lbaas.opts.NodeSecurityGroupIDs) == 0 { - // Just happen when nodes have not Security Group, or should not happen - // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty - // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted. - glog.Warningf("Can not find node-security-group from all the nodes of this cluser when delete loadbalancer service %s/%s", - service.Namespace, service.Name) + return nil +} + +// EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service. +func (lbaas *LbaasV2) EnsureSecurityGroupDeleted(clusterName string, service *v1.Service) error { + // Generate Name + lbSecGroupName := getSecurityGroupName(service) + lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) + if err != nil { + // check whether security group does not exist + _, ok := err.(*gophercloud.ErrResourceNotFound) + if ok { + // It is OK when the security group has been deleted by others. + return nil } else { - // Delete the rules in the Node Security Group - for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs { - opts := rules.ListOpts{ - SecGroupID: nodeSecurityGroupID, - RemoteGroupID: lbSecGroupID, - } - secGroupRules, err := getSecurityGroupRules(lbaas.network, opts) + return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err) + } + } - if err != nil && !isNotFound(err) { - msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err) - return fmt.Errorf(msg) - } + lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID) + if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) { + return lbSecGroup.Err + } - for _, rule := range secGroupRules { - res := rules.Delete(lbaas.network, rule.ID) - if res.Err != nil && !isNotFound(res.Err) { - return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err) - } + if len(lbaas.opts.NodeSecurityGroupIDs) == 0 { + // Just happen when nodes have not Security Group, or should not happen + // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty + // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted. + glog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s", + service.Namespace, service.Name) + } else { + // Delete the rules in the Node Security Group + for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs { + opts := rules.ListOpts{ + SecGroupID: nodeSecurityGroupID, + RemoteGroupID: lbSecGroupID, + } + secGroupRules, err := getSecurityGroupRules(lbaas.network, opts) + + if err != nil && !isNotFound(err) { + msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err) + return fmt.Errorf(msg) + } + + for _, rule := range secGroupRules { + res := rules.Delete(lbaas.network, rule.ID) + if res.Err != nil && !isNotFound(res.Err) { + return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err) + } + } + } + } + + return nil +} + +// getOldSecurityGroupName is used to get the old security group name +// Related to #53764 +// TODO(FengyunPan): Remove it at V1.10 +func getOldSecurityGroupName(clusterName string, service *v1.Service) string { + return fmt.Sprintf("lb-sg-%s-%v", clusterName, service.Name) +} + +// EnsureOldSecurityGroupDeleted deleting old security group for specific loadbalancer service. +// Related to #53764 +// TODO(FengyunPan): Remove it at V1.10 +func (lbaas *LbaasV2) EnsureOldSecurityGroupDeleted(clusterName string, service *v1.Service) error { + glog.V(4).Infof("EnsureOldSecurityGroupDeleted(%v, %v)", clusterName, service) + // Generate Name + lbSecGroupName := getOldSecurityGroupName(clusterName, service) + lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) + if err != nil { + // check whether security group does not exist + _, ok := err.(*gophercloud.ErrResourceNotFound) + if ok { + // It is OK when the security group has been deleted by others. + return nil + } else { + return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err) + } + } + + lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID) + if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) { + return lbSecGroup.Err + } + + if len(lbaas.opts.NodeSecurityGroupIDs) == 0 { + // Just happen when nodes have not Security Group, or should not happen + // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty + // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted. + glog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s", + service.Namespace, service.Name) + } else { + // Delete the rules in the Node Security Group + for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs { + opts := rules.ListOpts{ + SecGroupID: nodeSecurityGroupID, + RemoteGroupID: lbSecGroupID, + } + secGroupRules, err := getSecurityGroupRules(lbaas.network, opts) + + if err != nil && !isNotFound(err) { + msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err) + return fmt.Errorf(msg) + } + + for _, rule := range secGroupRules { + res := rules.Delete(lbaas.network, rule.ID) + if res.Err != nil && !isNotFound(res.Err) { + return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err) } } } diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index c84ef3bd367..cfb191f9ca0 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -50,8 +50,6 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", - "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index b3cba8a9312..38c6b99da2c 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -89,6 +89,7 @@ type serviceCache struct { type ServiceController struct { cloud cloudprovider.Interface knownHosts []*v1.Node + servicesToUpdate []*v1.Service kubeClient clientset.Interface clusterName string balancer cloudprovider.LoadBalancer @@ -243,20 +244,6 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s } } } - - if cachedService.state != nil { - if !s.needsUpdate(cachedService.state, service) { - // The service does not require an update which means it was placed on the work queue - // by the node sync loop and indicates that the hosts need to be updated. - err := s.updateLoadBalancerHosts(service) - if err != nil { - return err, cachedService.nextRetryDelay() - } - cachedService.resetRetryDelay() - return nil, doNotRetry - } - } - // cache the service, we need the info for service deletion cachedService.state = service err, retry := s.createLoadBalancerIfNeeded(key, service) @@ -451,8 +438,6 @@ func (s *serviceCache) delete(serviceName string) { delete(s.serviceMap, serviceName) } -// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update. -// This method does not and should not check if the hosts have changed. func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) { return false @@ -651,45 +636,62 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate { } } -// nodeSyncLoop handles adding all existing cached services to the work queue -// to be reprocessed so that they can have their hosts updated, if any -// host changes have occurred since the last sync loop. +// nodeSyncLoop handles updating the hosts pointed to by all load +// balancers whenever the set of nodes in the cluster changes. func (s *ServiceController) nodeSyncLoop() { newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil { glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) return } - if nodeSlicesEqualForLB(newHosts, s.knownHosts) { - // Nothing to do since the hosts have not changed. + // The set of nodes in the cluster hasn't changed, but we can retry + // updating any services that we failed to update last time around. + s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) return } - glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts)) + glog.Infof("Detected change in list of current cluster nodes. New node set: %v", + nodeNames(newHosts)) - for _, svc := range s.cache.allServices() { - s.enqueueService(svc) - } + // Try updating all services, and save the ones that fail to try again next + // round. + s.servicesToUpdate = s.cache.allServices() + numServices := len(s.servicesToUpdate) + s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) + glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", + numServices-len(s.servicesToUpdate), numServices) - // Update the known hosts so we can check next sync loop for changes. s.knownHosts = newHosts } -// Updates the load balancer of the service with updated nodes ONLY. -// This method will not trigger the cloud provider to create or full update a load balancer. -func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error { +// updateLoadBalancerHosts updates all existing load balancers so that +// they will match the list of hosts provided. +// Returns the list of services that couldn't be updated. +func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { + for _, service := range services { + func() { + if service == nil { + return + } + if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { + glog.Errorf("External error while updating load balancer: %v.", err) + servicesToRetry = append(servicesToRetry, service) + } + }() + } + return servicesToRetry +} + +// Updates the load balancer of a service, assuming we hold the mutex +// associated with the service. +func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { if !wantsLoadBalancer(service) { return nil } - hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) - if err != nil { - return err - } - // This operation doesn't normally take very long (and happens pretty often), so we only record the final event - err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) + err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) if err == nil { // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(hosts) == 0 { diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index ce99574936e..0c4990adb1a 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -19,7 +19,6 @@ package service import ( "fmt" "reflect" - "sort" "testing" "time" @@ -28,8 +27,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/testapi" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" @@ -177,45 +174,23 @@ func TestCreateExternalLoadBalancer(t *testing.T) { } } -// newLoadBalancerNode returns a node that passes the predicate check for a -// node to receive load balancer traffic. -func newLoadBalancerNode(name string) *v1.Node { - return &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1.NodeSpec{ - Unschedulable: false, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - {Type: v1.NodeReady, Status: v1.ConditionTrue}, - }, - }, - } -} - -func sortNodesByName(nodes []*v1.Node) { - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Name < nodes[j].Name - }) -} - // TODO: Finish converting and update comments func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { - nodes := []*v1.Node{ - newLoadBalancerNode("node0"), - newLoadBalancerNode("node1"), - newLoadBalancerNode("node73"), + {ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, } - sortNodesByName(nodes) - - table := map[string]struct { + table := []struct { services []*v1.Service expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall }{ - "update no load balancer": { + { + // No services present: no calls should be made. + services: []*v1.Service{}, + expectedUpdateCalls: nil, + }, + { // Services do not have external load balancers: no calls should be made. services: []*v1.Service{ newService("s0", "111", v1.ServiceTypeClusterIP), @@ -223,7 +198,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { }, expectedUpdateCalls: nil, }, - "update 1 load balancer": { + { // Services does have an external load balancer: one call should be made. services: []*v1.Service{ newService("s0", "333", v1.ServiceTypeLoadBalancer), @@ -232,7 +207,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - "update 3 load balancers": { + { // Three services have an external load balancer: three calls. services: []*v1.Service{ newService("s0", "444", v1.ServiceTypeLoadBalancer), @@ -245,7 +220,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - "update 2 load balancers": { + { // Two services have an external load balancer and two don't: two calls. services: []*v1.Service{ newService("s0", "777", v1.ServiceTypeNodePort), @@ -258,44 +233,30 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, + { + // One service has an external load balancer and one is nil: one call. + services: []*v1.Service{ + newService("s0", "234", v1.ServiceTypeLoadBalancer), + nil, + }, + expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ + {Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + }, + }, } + for _, item := range table { + controller, cloud, _ := newController() - for name, item := range table { - t.Run(name, func(t *testing.T) { - controller, cloud, _ := newController() - - var services []*v1.Service - for _, service := range item.services { - services = append(services, service) - } - nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - for _, node := range nodes { - nodeIndexer.Add(node) - } - controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) - - for _, service := range services { - if err := controller.updateLoadBalancerHosts(service); err != nil { - t.Errorf("unexpected error: %v", err) - } - } - - if len(item.expectedUpdateCalls) != len(cloud.UpdateCalls) { - t.Errorf("expected %d update calls but only got %d", len(item.expectedUpdateCalls), len(cloud.UpdateCalls)) - } - - for i, expectedCall := range item.expectedUpdateCalls { - actualCall := cloud.UpdateCalls[i] - if !reflect.DeepEqual(expectedCall.Service, actualCall.Service) { - t.Errorf("expected update call to contain service %+v, got %+v", expectedCall.Service, actualCall.Service) - } - - sortNodesByName(actualCall.Hosts) - if !reflect.DeepEqual(expectedCall.Hosts, actualCall.Hosts) { - t.Errorf("expected update call to contain hosts %+v, got %+v", expectedCall.Hosts, actualCall.Hosts) - } - } - }) + var services []*v1.Service + for _, service := range item.services { + services = append(services, service) + } + if err := controller.updateLoadBalancerHosts(services, nodes); err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { + t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) + } } } @@ -350,13 +311,6 @@ func TestProcessServiceUpdate(t *testing.T) { var controller *ServiceController var cloud *fakecloud.FakeCloud - nodes := []*v1.Node{ - newLoadBalancerNode("node0"), - newLoadBalancerNode("node1"), - newLoadBalancerNode("node73"), - } - sortNodesByName(nodes) - //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" newLBIP := "192.168.1.11" @@ -390,51 +344,6 @@ func TestProcessServiceUpdate(t *testing.T) { return nil }, }, - { - testName: "If updating hosts only", - key: "default/sync-test-name", - svc: newService("sync-test-name", types.UID("sync-test-uid"), v1.ServiceTypeLoadBalancer), - updateFn: func(svc *v1.Service) *v1.Service { - keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName() - cachedServiceTest := controller.cache.getOrCreate(keyExpected) - cachedServiceTest.state = svc - controller.cache.set(keyExpected, cachedServiceTest) - - // Set the nodes for the cloud's UpdateLoadBalancer call to use. - nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - for _, node := range nodes { - nodeIndexer.Add(node) - } - controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) - - // This should trigger the needsUpdate false check since the service equals the cached service - return svc - }, - expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error { - if err != nil { - return err - } - if retryDuration != doNotRetry { - return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) - } - - if len(cloud.UpdateCalls) != 1 { - return fmt.Errorf("expected one update host call but only got %+v", cloud.UpdateCalls) - } - - actualCall := cloud.UpdateCalls[0] - if !reflect.DeepEqual(svc, actualCall.Service) { - return fmt.Errorf("expected update call to contain service %+v, got %+v", svc, actualCall.Service) - } - - sortNodesByName(actualCall.Hosts) - if !reflect.DeepEqual(nodes, actualCall.Hosts) { - return fmt.Errorf("expected update call to contain hosts %+v, got %+v", nodes, actualCall.Hosts) - } - - return nil - }, - }, { testName: "If Updating Loadbalancer IP", key: "default/sync-test-name", diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 1ed30a62fda..5002f5e41ba 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -206,6 +206,12 @@ const ( // alpha: v1.9 // Postpone deletion of a persistent volume claim in case it is used by a pod PVCProtection utilfeature.Feature = "PVCProtection" + + // owner: @aveshagarwal + // alpha: v1.9 + // + // Enable resource limits priority function + ResourceLimitsPriorityFunction utilfeature.Feature = "ResourceLimitsPriorityFunction" ) func init() { @@ -244,6 +250,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS CustomPodDNS: {Default: false, PreRelease: utilfeature.Alpha}, BlockVolume: {Default: false, PreRelease: utilfeature.Alpha}, PVCProtection: {Default: false, PreRelease: utilfeature.Alpha}, + ResourceLimitsPriorityFunction: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/dockershim/libdocker/fake_client.go b/pkg/kubelet/dockershim/libdocker/fake_client.go index 529c9141991..4474fa4a5b6 100644 --- a/pkg/kubelet/dockershim/libdocker/fake_client.go +++ b/pkg/kubelet/dockershim/libdocker/fake_client.go @@ -410,7 +410,7 @@ func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptio var filtered []dockertypes.Container for _, container := range containerList { for _, statusFilter := range statusFilters { - if container.Status == statusFilter { + if toDockerContainerStatus(container.Status) == statusFilter { filtered = append(filtered, container) break } @@ -443,6 +443,19 @@ func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptio return containerList, err } +func toDockerContainerStatus(state string) string { + switch { + case strings.HasPrefix(state, StatusCreatedPrefix): + return "created" + case strings.HasPrefix(state, StatusRunningPrefix): + return "running" + case strings.HasPrefix(state, StatusExitedPrefix): + return "exited" + default: + return "unknown" + } +} + // InspectContainer is a test-spy implementation of Interface.InspectContainer. // It adds an entry "inspect" to the internal method call record. func (f *FakeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) { @@ -565,6 +578,18 @@ func (f *FakeDockerClient) StartContainer(id string) error { } f.appendContainerTrace("Started", id) container, ok := f.ContainerMap[id] + if container.HostConfig.NetworkMode.IsContainer() { + hostContainerID := container.HostConfig.NetworkMode.ConnectedContainer() + found := false + for _, container := range f.RunningContainerList { + if container.ID == hostContainerID { + found = true + } + } + if !found { + return fmt.Errorf("failed to start container \"%s\": Error response from daemon: cannot join network of a non running container: %s", id, hostContainerID) + } + } timestamp := f.Clock.Now() if !ok { container = convertFakeContainer(&FakeContainer{ID: id, Name: id, CreatedAt: timestamp}) diff --git a/pkg/volume/aws_ebs/attacher.go b/pkg/volume/aws_ebs/attacher.go index ac8ea48fe1a..ce96b94ddde 100644 --- a/pkg/volume/aws_ebs/attacher.go +++ b/pkg/volume/aws_ebs/attacher.go @@ -256,21 +256,7 @@ func (plugin *awsElasticBlockStorePlugin) NewDetacher() (volume.Detacher, error) func (detacher *awsElasticBlockStoreDetacher) Detach(volumeName string, nodeName types.NodeName) error { volumeID := aws.KubernetesVolumeID(path.Base(volumeName)) - attached, err := detacher.awsVolumes.DiskIsAttached(volumeID, nodeName) - if err != nil { - // Log error and continue with detach - glog.Errorf( - "Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v", - volumeID, nodeName, err) - } - - if err == nil && !attached { - // Volume is already detached from node. - glog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, nodeName) - return nil - } - - if _, err = detacher.awsVolumes.DetachDisk(volumeID, nodeName); err != nil { + if _, err := detacher.awsVolumes.DetachDisk(volumeID, nodeName); err != nil { glog.Errorf("Error detaching volumeID %q: %v", volumeID, err) return err } diff --git a/pkg/volume/aws_ebs/attacher_test.go b/pkg/volume/aws_ebs/attacher_test.go index 813139e5b95..1076d06910e 100644 --- a/pkg/volume/aws_ebs/attacher_test.go +++ b/pkg/volume/aws_ebs/attacher_test.go @@ -62,10 +62,9 @@ func TestGetVolumeName_PersistentVolume(t *testing.T) { type testcase struct { name aws.KubernetesVolumeID // For fake AWS: - attach attachCall - detach detachCall - diskIsAttached diskIsAttachedCall - t *testing.T + attach attachCall + detach detachCall + t *testing.T // Actual test to run test func(test *testcase) (string, error) @@ -81,7 +80,6 @@ func TestAttachDetach(t *testing.T) { spec := createVolSpec(diskName, readOnly) attachError := errors.New("Fake attach error") detachError := errors.New("Fake detach error") - diskCheckError := errors.New("Fake DiskIsAttached error") tests := []testcase{ // Successful Attach call { @@ -107,44 +105,18 @@ func TestAttachDetach(t *testing.T) { // Detach succeeds { - name: "Detach_Positive", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, true, nil}, - detach: detachCall{diskName, nodeName, "/dev/sda", nil}, + name: "Detach_Positive", + detach: detachCall{diskName, nodeName, "/dev/sda", nil}, test: func(testcase *testcase) (string, error) { detacher := newDetacher(testcase) mountPath := "/mnt/" + string(diskName) return "", detacher.Detach(mountPath, nodeName) }, }, - - // Disk is already detached - { - name: "Detach_Positive_AlreadyDetached", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, nil}, - test: func(testcase *testcase) (string, error) { - detacher := newDetacher(testcase) - mountPath := "/mnt/" + string(diskName) - return "", detacher.Detach(mountPath, nodeName) - }, - }, - - // Detach succeeds when DiskIsAttached fails - { - name: "Detach_Positive_CheckFails", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, diskCheckError}, - detach: detachCall{diskName, nodeName, "/dev/sda", nil}, - test: func(testcase *testcase) (string, error) { - detacher := newDetacher(testcase) - mountPath := "/mnt/" + string(diskName) - return "", detacher.Detach(mountPath, nodeName) - }, - }, - // Detach fails { - name: "Detach_Negative", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, diskCheckError}, - detach: detachCall{diskName, nodeName, "", detachError}, + name: "Detach_Negative", + detach: detachCall{diskName, nodeName, "", detachError}, test: func(testcase *testcase) (string, error) { detacher := newDetacher(testcase) mountPath := "/mnt/" + string(diskName) @@ -298,28 +270,8 @@ func (testcase *testcase) DetachDisk(diskName aws.KubernetesVolumeID, nodeName t } func (testcase *testcase) DiskIsAttached(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) { - expected := &testcase.diskIsAttached - - if expected.diskName == "" && expected.nodeName == "" { - // testcase.diskIsAttached looks uninitialized, test did not expect to - // call DiskIsAttached - testcase.t.Errorf("Unexpected DiskIsAttached call!") - return false, errors.New("Unexpected DiskIsAttached call!") - } - - if expected.diskName != diskName { - testcase.t.Errorf("Unexpected DiskIsAttached call: expected diskName %s, got %s", expected.diskName, diskName) - return false, errors.New("Unexpected DiskIsAttached call: wrong diskName") - } - - if expected.nodeName != nodeName { - testcase.t.Errorf("Unexpected DiskIsAttached call: expected nodeName %s, got %s", expected.nodeName, nodeName) - return false, errors.New("Unexpected DiskIsAttached call: wrong nodeName") - } - - glog.V(4).Infof("DiskIsAttached call: %s, %s, returning %v, %v", diskName, nodeName, expected.isAttached, expected.ret) - - return expected.isAttached, expected.ret + // DetachDisk no longer relies on DiskIsAttached api call + return false, nil } func (testcase *testcase) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { diff --git a/plugin/pkg/scheduler/algorithm/priorities/BUILD b/plugin/pkg/scheduler/algorithm/priorities/BUILD index 62913a89bd4..f473a44b4c2 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/BUILD +++ b/plugin/pkg/scheduler/algorithm/priorities/BUILD @@ -19,6 +19,7 @@ go_library( "node_label.go", "node_prefer_avoid_pods.go", "reduce.go", + "resource_limits.go", "selector_spreading.go", "taint_toleration.go", "test_util.go", @@ -54,6 +55,7 @@ go_test( "node_affinity_test.go", "node_label_test.go", "node_prefer_avoid_pods_test.go", + "resource_limits_test.go", "selector_spreading_test.go", "taint_toleration_test.go", ], diff --git a/plugin/pkg/scheduler/algorithm/priorities/resource_limits.go b/plugin/pkg/scheduler/algorithm/priorities/resource_limits.go new file mode 100644 index 00000000000..77ae0dca923 --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/priorities/resource_limits.go @@ -0,0 +1,128 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priorities + +import ( + "fmt" + + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + + "github.com/golang/glog" +) + +// ResourceLimitsPriorityMap is a priority function that increases score of input node by 1 if the node satisfies +// input pod's resource limits. In detail, this priority function works as follows: If a node does not publish its +// allocatable resources (cpu and memory both), the node score is not affected. If a pod does not specify +// its cpu and memory limits both, the node score is not affected. If one or both of cpu and memory limits +// of the pod are satisfied, the node is assigned a score of 1. +// Rationale of choosing the lowest score of 1 is that this is mainly selected to break ties between nodes that have +// same scores assigned by one of least and most requested priority functions. +func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } + + allocatableResources := nodeInfo.AllocatableResource() + + // compute pod limits + podLimits := getResourceLimits(pod) + + cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU) + memScore := computeScore(podLimits.Memory, allocatableResources.Memory) + + score := int(0) + if cpuScore == 1 || memScore == 1 { + score = 1 + } + + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.Infof( + "%v -> %v: Resource Limits Priority, allocatable %d millicores %d memory bytes, pod limits %d millicores %d memory bytes, score %d", + pod.Name, node.Name, + allocatableResources.MilliCPU, allocatableResources.Memory, + podLimits.MilliCPU, podLimits.Memory, + score, + ) + } + + return schedulerapi.HostPriority{ + Host: node.Name, + Score: score, + }, nil +} + +// computeScore return 1 if limit value is less than or equal to allocable +// value, otherwise it returns 0. +func computeScore(limit, allocatable int64) int64 { + if limit != 0 && allocatable != 0 && limit <= allocatable { + return 1 + } + return 0 +} + +// getResourceLimits computes resource limits for input pod. +// The reason to create this new function is to be consistent with other +// priority functions because most or perhaps all priority functions work +// with schedulercache.Resource. +// TODO: cache it as part of metadata passed to priority functions. +func getResourceLimits(pod *v1.Pod) *schedulercache.Resource { + result := &schedulercache.Resource{} + for _, container := range pod.Spec.Containers { + result.Add(container.Resources.Limits) + } + + // take max_resource(sum_pod, any_init_container) + for _, container := range pod.Spec.InitContainers { + for rName, rQuantity := range container.Resources.Limits { + switch rName { + case v1.ResourceMemory: + if mem := rQuantity.Value(); mem > result.Memory { + result.Memory = mem + } + case v1.ResourceCPU: + if cpu := rQuantity.MilliValue(); cpu > result.MilliCPU { + result.MilliCPU = cpu + } + // keeping these resources though score computation in other priority functions and in this + // are only computed based on cpu and memory only. + case v1.ResourceEphemeralStorage: + if ephemeralStorage := rQuantity.Value(); ephemeralStorage > result.EphemeralStorage { + result.EphemeralStorage = ephemeralStorage + } + case v1.ResourceNvidiaGPU: + if gpu := rQuantity.Value(); gpu > result.NvidiaGPU { + result.NvidiaGPU = gpu + } + default: + if v1helper.IsScalarResourceName(rName) { + value := rQuantity.Value() + if value > result.ScalarResources[rName] { + result.SetScalar(rName, value) + } + } + } + } + } + + return result +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/resource_limits_test.go b/plugin/pkg/scheduler/algorithm/priorities/resource_limits_test.go new file mode 100644 index 00000000000..0a48cc73308 --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/priorities/resource_limits_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priorities + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + //metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" +) + +func TestResourceLimistPriority(t *testing.T) { + noResources := v1.PodSpec{ + Containers: []v1.Container{}, + } + + cpuOnly := v1.PodSpec{ + NodeName: "machine1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + }, + } + + memOnly := v1.PodSpec{ + NodeName: "machine2", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + v1.ResourceMemory: resource.MustParse("2000"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + v1.ResourceMemory: resource.MustParse("3000"), + }, + }, + }, + }, + } + + cpuAndMemory := v1.PodSpec{ + NodeName: "machine2", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("2000"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("3000"), + }, + }, + }, + }, + } + + tests := []struct { + // input pod + pod *v1.Pod + nodes []*v1.Node + expectedList schedulerapi.HostPriorityList + test string + }{ + { + pod: &v1.Pod{Spec: noResources}, + nodes: []*v1.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 0), makeNode("machine3", 0, 10000), makeNode("machine4", 0, 0)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}, {Host: "machine3", Score: 0}, {Host: "machine4", Score: 0}}, + test: "pod does not specify its resource limits", + }, + { + pod: &v1.Pod{Spec: cpuOnly}, + nodes: []*v1.Node{makeNode("machine1", 3000, 10000), makeNode("machine2", 2000, 10000)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 0}}, + test: "pod only specifies cpu limits", + }, + { + pod: &v1.Pod{Spec: memOnly}, + nodes: []*v1.Node{makeNode("machine1", 4000, 4000), makeNode("machine2", 5000, 10000)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 1}}, + test: "pod only specifies mem limits", + }, + { + pod: &v1.Pod{Spec: cpuAndMemory}, + nodes: []*v1.Node{makeNode("machine1", 4000, 4000), makeNode("machine2", 5000, 10000)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 1}}, + test: "pod specifies both cpu and mem limits", + }, + { + pod: &v1.Pod{Spec: cpuAndMemory}, + nodes: []*v1.Node{makeNode("machine1", 0, 0)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}}, + test: "node does not advertise its allocatables", + }, + } + + for _, test := range tests { + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) + list, err := priorityFunction(ResourceLimitsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } + +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 5bb5f136192..1b7b34d96bd 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -106,6 +106,10 @@ func init() { factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1) // Optional, cluster-autoscaler friendly priority function - give used nodes higher priority. factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1) + // Prioritizes nodes that satisfy pod's resource limits + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) { + factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1) + } } func defaultPredicates() sets.String {