From 7215ce30b1f0c2ea8986572c3b2faf66b1f47b82 Mon Sep 17 00:00:00 2001 From: FengyunPan Date: Thu, 26 Oct 2017 20:23:16 +0800 Subject: [PATCH 1/9] Add service.UID into security group name Related to: #53714 --- .../openstack/openstack_loadbalancer.go | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go index 98b5b44ed77..90c95557324 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go +++ b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go @@ -292,8 +292,14 @@ func popMember(members []v2pools.Member, addr string, port int) []v2pools.Member return members } -func getSecurityGroupName(clusterName string, service *v1.Service) string { - return fmt.Sprintf("lb-sg-%s-%s-%s", clusterName, service.Namespace, service.Name) +func getSecurityGroupName(service *v1.Service) string { + securityGroupName := fmt.Sprintf("lb-sg-%s-%s-%s", service.UID, service.Namespace, service.Name) + //OpenStack requires that the name of a security group is shorter than 255 bytes. + if len(securityGroupName) > 255 { + securityGroupName = securityGroupName[:255] + } + + return securityGroupName } func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) { @@ -899,7 +905,7 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser } // ensure security group for LB - lbSecGroupName := getSecurityGroupName(clusterName, apiService) + lbSecGroupName := getSecurityGroupName(apiService) lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) if err != nil { // check whether security group does not exist @@ -914,8 +920,8 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser if len(lbSecGroupID) == 0 { // create security group lbSecGroupCreateOpts := groups.CreateOpts{ - Name: getSecurityGroupName(clusterName, apiService), - Description: fmt.Sprintf("Securty Group for loadbalancer service %s/%s", apiService.Namespace, apiService.Name), + Name: getSecurityGroupName(apiService), + Description: fmt.Sprintf("Securty Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName), } lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract() @@ -1197,7 +1203,7 @@ func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Ser removals := original.Difference(current) // Generate Name - lbSecGroupName := getSecurityGroupName(clusterName, apiService) + lbSecGroupName := getSecurityGroupName(apiService) lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) if err != nil { return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err) @@ -1369,7 +1375,7 @@ func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(clusterName string, service *v1. // Delete the Security Group if lbaas.opts.ManageSecurityGroups { // Generate Name - lbSecGroupName := getSecurityGroupName(clusterName, service) + lbSecGroupName := getSecurityGroupName(service) lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) if err != nil { // check whether security group does not exist From 669520f9bbc197496ab632cc1c34fb25343e28c2 Mon Sep 17 00:00:00 2001 From: FengyunPan Date: Tue, 21 Nov 2017 09:38:43 +0800 Subject: [PATCH 2/9] Add EnsureOldSecurityGroupDeleted to delete old security group Consider the migration from the old security group name to the new security group name, we need delete the old security group. At V1.10, we can assume everyone is using the new security group names and remove this code. --- .../openstack/openstack_loadbalancer.go | 165 ++++++++++++++---- 1 file changed, 127 insertions(+), 38 deletions(-) diff --git a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go index 90c95557324..471fdc31a61 100644 --- a/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go +++ b/pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go @@ -874,6 +874,14 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv _ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService) return status, err } + + // delete the old Security Group for the service + // Related to #53764 + // TODO(FengyunPan): Remove it at V1.10 + err = lbaas.EnsureOldSecurityGroupDeleted(clusterName, apiService) + if err != nil { + return status, fmt.Errorf("Failed to delete the Security Group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err) + } } return status, nil @@ -921,7 +929,7 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser // create security group lbSecGroupCreateOpts := groups.CreateOpts{ Name: getSecurityGroupName(apiService), - Description: fmt.Sprintf("Securty Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName), + Description: fmt.Sprintf("Security Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName), } lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract() @@ -1180,7 +1188,7 @@ func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *v1.Service if lbaas.opts.ManageSecurityGroups { err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer) if err != nil { - return fmt.Errorf("failed to update Securty Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) + return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) } } @@ -1374,50 +1382,131 @@ func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(clusterName string, service *v1. // Delete the Security Group if lbaas.opts.ManageSecurityGroups { - // Generate Name - lbSecGroupName := getSecurityGroupName(service) - lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) + err := lbaas.EnsureSecurityGroupDeleted(clusterName, service) if err != nil { - // check whether security group does not exist - _, ok := err.(*gophercloud.ErrResourceNotFound) - if ok { - // It is OK when the security group has been deleted by others. - return nil - } else { - return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err) - } + return fmt.Errorf("Failed to delete Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) } - lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID) - if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) { - return lbSecGroup.Err + // delete the old Security Group for the service + // Related to #53764 + // TODO(FengyunPan): Remove it at V1.10 + err = lbaas.EnsureOldSecurityGroupDeleted(clusterName, service) + if err != nil { + return fmt.Errorf("Failed to delete the Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err) } + } - if len(lbaas.opts.NodeSecurityGroupIDs) == 0 { - // Just happen when nodes have not Security Group, or should not happen - // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty - // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted. - glog.Warningf("Can not find node-security-group from all the nodes of this cluser when delete loadbalancer service %s/%s", - service.Namespace, service.Name) + return nil +} + +// EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service. +func (lbaas *LbaasV2) EnsureSecurityGroupDeleted(clusterName string, service *v1.Service) error { + // Generate Name + lbSecGroupName := getSecurityGroupName(service) + lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) + if err != nil { + // check whether security group does not exist + _, ok := err.(*gophercloud.ErrResourceNotFound) + if ok { + // It is OK when the security group has been deleted by others. + return nil } else { - // Delete the rules in the Node Security Group - for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs { - opts := rules.ListOpts{ - SecGroupID: nodeSecurityGroupID, - RemoteGroupID: lbSecGroupID, - } - secGroupRules, err := getSecurityGroupRules(lbaas.network, opts) + return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err) + } + } - if err != nil && !isNotFound(err) { - msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err) - return fmt.Errorf(msg) - } + lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID) + if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) { + return lbSecGroup.Err + } - for _, rule := range secGroupRules { - res := rules.Delete(lbaas.network, rule.ID) - if res.Err != nil && !isNotFound(res.Err) { - return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err) - } + if len(lbaas.opts.NodeSecurityGroupIDs) == 0 { + // Just happen when nodes have not Security Group, or should not happen + // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty + // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted. + glog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s", + service.Namespace, service.Name) + } else { + // Delete the rules in the Node Security Group + for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs { + opts := rules.ListOpts{ + SecGroupID: nodeSecurityGroupID, + RemoteGroupID: lbSecGroupID, + } + secGroupRules, err := getSecurityGroupRules(lbaas.network, opts) + + if err != nil && !isNotFound(err) { + msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err) + return fmt.Errorf(msg) + } + + for _, rule := range secGroupRules { + res := rules.Delete(lbaas.network, rule.ID) + if res.Err != nil && !isNotFound(res.Err) { + return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err) + } + } + } + } + + return nil +} + +// getOldSecurityGroupName is used to get the old security group name +// Related to #53764 +// TODO(FengyunPan): Remove it at V1.10 +func getOldSecurityGroupName(clusterName string, service *v1.Service) string { + return fmt.Sprintf("lb-sg-%s-%v", clusterName, service.Name) +} + +// EnsureOldSecurityGroupDeleted deleting old security group for specific loadbalancer service. +// Related to #53764 +// TODO(FengyunPan): Remove it at V1.10 +func (lbaas *LbaasV2) EnsureOldSecurityGroupDeleted(clusterName string, service *v1.Service) error { + glog.V(4).Infof("EnsureOldSecurityGroupDeleted(%v, %v)", clusterName, service) + // Generate Name + lbSecGroupName := getOldSecurityGroupName(clusterName, service) + lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName) + if err != nil { + // check whether security group does not exist + _, ok := err.(*gophercloud.ErrResourceNotFound) + if ok { + // It is OK when the security group has been deleted by others. + return nil + } else { + return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err) + } + } + + lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID) + if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) { + return lbSecGroup.Err + } + + if len(lbaas.opts.NodeSecurityGroupIDs) == 0 { + // Just happen when nodes have not Security Group, or should not happen + // UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty + // And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted. + glog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s", + service.Namespace, service.Name) + } else { + // Delete the rules in the Node Security Group + for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs { + opts := rules.ListOpts{ + SecGroupID: nodeSecurityGroupID, + RemoteGroupID: lbSecGroupID, + } + secGroupRules, err := getSecurityGroupRules(lbaas.network, opts) + + if err != nil && !isNotFound(err) { + msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err) + return fmt.Errorf(msg) + } + + for _, rule := range secGroupRules { + res := rules.Delete(lbaas.network, rule.ID) + if res.Err != nil && !isNotFound(res.Err) { + return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err) } } } From ac2c68ad8f29e004f2330e590fe0a61f57965038 Mon Sep 17 00:00:00 2001 From: Hemant Kumar Date: Thu, 16 Nov 2017 09:31:09 -0500 Subject: [PATCH 3/9] AWS: Implement fix for detaching volume from stopped instances Clean up detach disk functions and remove duplication --- pkg/cloudprovider/providers/aws/aws.go | 64 ++++++++------------- pkg/cloudprovider/providers/aws/volumes.go | 65 +++++++++++++++++++++ pkg/volume/aws_ebs/attacher.go | 16 +----- pkg/volume/aws_ebs/attacher_test.go | 66 +++------------------- 4 files changed, 97 insertions(+), 114 deletions(-) diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index c25b0f58ea2..5f8d0ab36cf 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -2021,26 +2021,24 @@ func (c *Cloud) AttachDisk(diskName KubernetesVolumeID, nodeName types.NodeName, // DetachDisk implements Volumes.DetachDisk func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) (string, error) { - disk, err := newAWSDisk(c, diskName) - if err != nil { + diskInfo, attached, err := c.checkIfAttachedToNode(diskName, nodeName) + + if diskInfo == nil { return "", err } - awsInstance, info, err := c.getFullInstance(nodeName) - if err != nil { - if err == cloudprovider.InstanceNotFound { - // If instance no longer exists, safe to assume volume is not attached. - glog.Warningf( - "Instance %q does not exist. DetachDisk will assume disk %q is not attached to it.", - nodeName, - diskName) - return "", nil - } - - return "", err + if !attached && diskInfo.ec2Instance != nil { + glog.Warningf("DetachDisk %s called for node %s but volume is attached to node %s", diskName, nodeName, diskInfo.nodeName) + return "", nil } - mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, info, disk.awsID, false) + if !attached { + return "", nil + } + + awsInstance := newAWSInstance(c.ec2, diskInfo.ec2Instance) + + mountDevice, alreadyAttached, err := c.getMountDevice(awsInstance, diskInfo.ec2Instance, diskInfo.disk.awsID, false) if err != nil { return "", err } @@ -2052,18 +2050,19 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) request := ec2.DetachVolumeInput{ InstanceId: &awsInstance.awsID, - VolumeId: disk.awsID.awsString(), + VolumeId: diskInfo.disk.awsID.awsString(), } response, err := c.ec2.DetachVolume(&request) if err != nil { - return "", fmt.Errorf("error detaching EBS volume %q from %q: %q", disk.awsID, awsInstance.awsID, err) + return "", fmt.Errorf("error detaching EBS volume %q from %q: %q", diskInfo.disk.awsID, awsInstance.awsID, err) } + if response == nil { return "", errors.New("no response from DetachVolume") } - attachment, err := disk.waitForAttachmentStatus("detached") + attachment, err := diskInfo.disk.waitForAttachmentStatus("detached") if err != nil { return "", err } @@ -2076,7 +2075,7 @@ func (c *Cloud) DetachDisk(diskName KubernetesVolumeID, nodeName types.NodeName) } if mountDevice != "" { - c.endAttaching(awsInstance, disk.awsID, mountDevice) + c.endAttaching(awsInstance, diskInfo.disk.awsID, mountDevice) // We don't check the return value - we don't really expect the attachment to have been // in progress, though it might have been } @@ -2320,32 +2319,13 @@ func (c *Cloud) GetDiskPath(volumeName KubernetesVolumeID) (string, error) { // DiskIsAttached implements Volumes.DiskIsAttached func (c *Cloud) DiskIsAttached(diskName KubernetesVolumeID, nodeName types.NodeName) (bool, error) { - _, instance, err := c.getFullInstance(nodeName) - if err != nil { - if err == cloudprovider.InstanceNotFound { - // If instance no longer exists, safe to assume volume is not attached. - glog.Warningf( - "Instance %q does not exist. DiskIsAttached will assume disk %q is not attached to it.", - nodeName, - diskName) - return false, nil - } + diskInfo, attached, err := c.checkIfAttachedToNode(diskName, nodeName) - return false, err + if diskInfo == nil { + return true, err } - diskID, err := diskName.mapToAWSVolumeID() - if err != nil { - return false, fmt.Errorf("error mapping volume spec %q to aws id: %v", diskName, err) - } - - for _, blockDevice := range instance.BlockDeviceMappings { - id := awsVolumeID(aws.StringValue(blockDevice.Ebs.VolumeId)) - if id == diskID { - return true, nil - } - } - return false, nil + return attached, nil } func (c *Cloud) DisksAreAttached(nodeDisks map[types.NodeName][]KubernetesVolumeID) (map[types.NodeName]map[KubernetesVolumeID]bool, error) { diff --git a/pkg/cloudprovider/providers/aws/volumes.go b/pkg/cloudprovider/providers/aws/volumes.go index c67f080fec1..3a4aa6284eb 100644 --- a/pkg/cloudprovider/providers/aws/volumes.go +++ b/pkg/cloudprovider/providers/aws/volumes.go @@ -23,6 +23,9 @@ import ( "strings" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/types" ) // awsVolumeRegMatch represents Regex Match for AWS volume. @@ -46,6 +49,16 @@ func (i awsVolumeID) awsString() *string { // * type KubernetesVolumeID string +// DiskInfo returns aws disk information in easy to use manner +type diskInfo struct { + ec2Instance *ec2.Instance + nodeName types.NodeName + volumeState string + attachmentState string + hasAttachment bool + disk *awsDisk +} + // mapToAWSVolumeID extracts the awsVolumeID from the KubernetesVolumeID func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) { // name looks like aws://availability-zone/awsVolumeId @@ -85,3 +98,55 @@ func (name KubernetesVolumeID) mapToAWSVolumeID() (awsVolumeID, error) { return awsVolumeID(awsID), nil } + +func GetAWSVolumeID(kubeVolumeID string) (string, error) { + kid := KubernetesVolumeID(kubeVolumeID) + awsID, err := kid.mapToAWSVolumeID() + return string(awsID), err +} + +func (c *Cloud) checkIfAttachedToNode(diskName KubernetesVolumeID, nodeName types.NodeName) (*diskInfo, bool, error) { + disk, err := newAWSDisk(c, diskName) + + if err != nil { + return nil, true, err + } + + awsDiskInfo := &diskInfo{ + disk: disk, + } + + info, err := disk.describeVolume() + + if err != nil { + describeError := fmt.Errorf("Error describing volume %s with %v", diskName, err) + glog.Warning(describeError) + awsDiskInfo.volumeState = "unknown" + return awsDiskInfo, false, describeError + } + + awsDiskInfo.volumeState = aws.StringValue(info.State) + + if len(info.Attachments) > 0 { + attachment := info.Attachments[0] + awsDiskInfo.attachmentState = aws.StringValue(attachment.State) + instanceID := aws.StringValue(attachment.InstanceId) + instanceInfo, err := c.getInstanceByID(instanceID) + + // This should never happen but if it does it could mean there was a race and instance + // has been deleted + if err != nil { + fetchErr := fmt.Errorf("Error fetching instance %s for volume %s", instanceID, diskName) + glog.Warning(fetchErr) + return awsDiskInfo, false, fetchErr + } + + awsDiskInfo.ec2Instance = instanceInfo + awsDiskInfo.nodeName = mapInstanceToNodeName(instanceInfo) + awsDiskInfo.hasAttachment = true + if awsDiskInfo.nodeName == nodeName { + return awsDiskInfo, true, nil + } + } + return awsDiskInfo, false, nil +} diff --git a/pkg/volume/aws_ebs/attacher.go b/pkg/volume/aws_ebs/attacher.go index ac8ea48fe1a..ce96b94ddde 100644 --- a/pkg/volume/aws_ebs/attacher.go +++ b/pkg/volume/aws_ebs/attacher.go @@ -256,21 +256,7 @@ func (plugin *awsElasticBlockStorePlugin) NewDetacher() (volume.Detacher, error) func (detacher *awsElasticBlockStoreDetacher) Detach(volumeName string, nodeName types.NodeName) error { volumeID := aws.KubernetesVolumeID(path.Base(volumeName)) - attached, err := detacher.awsVolumes.DiskIsAttached(volumeID, nodeName) - if err != nil { - // Log error and continue with detach - glog.Errorf( - "Error checking if volume (%q) is already attached to current node (%q). Will continue and try detach anyway. err=%v", - volumeID, nodeName, err) - } - - if err == nil && !attached { - // Volume is already detached from node. - glog.Infof("detach operation was successful. volume %q is already detached from node %q.", volumeID, nodeName) - return nil - } - - if _, err = detacher.awsVolumes.DetachDisk(volumeID, nodeName); err != nil { + if _, err := detacher.awsVolumes.DetachDisk(volumeID, nodeName); err != nil { glog.Errorf("Error detaching volumeID %q: %v", volumeID, err) return err } diff --git a/pkg/volume/aws_ebs/attacher_test.go b/pkg/volume/aws_ebs/attacher_test.go index 813139e5b95..1076d06910e 100644 --- a/pkg/volume/aws_ebs/attacher_test.go +++ b/pkg/volume/aws_ebs/attacher_test.go @@ -62,10 +62,9 @@ func TestGetVolumeName_PersistentVolume(t *testing.T) { type testcase struct { name aws.KubernetesVolumeID // For fake AWS: - attach attachCall - detach detachCall - diskIsAttached diskIsAttachedCall - t *testing.T + attach attachCall + detach detachCall + t *testing.T // Actual test to run test func(test *testcase) (string, error) @@ -81,7 +80,6 @@ func TestAttachDetach(t *testing.T) { spec := createVolSpec(diskName, readOnly) attachError := errors.New("Fake attach error") detachError := errors.New("Fake detach error") - diskCheckError := errors.New("Fake DiskIsAttached error") tests := []testcase{ // Successful Attach call { @@ -107,44 +105,18 @@ func TestAttachDetach(t *testing.T) { // Detach succeeds { - name: "Detach_Positive", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, true, nil}, - detach: detachCall{diskName, nodeName, "/dev/sda", nil}, + name: "Detach_Positive", + detach: detachCall{diskName, nodeName, "/dev/sda", nil}, test: func(testcase *testcase) (string, error) { detacher := newDetacher(testcase) mountPath := "/mnt/" + string(diskName) return "", detacher.Detach(mountPath, nodeName) }, }, - - // Disk is already detached - { - name: "Detach_Positive_AlreadyDetached", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, nil}, - test: func(testcase *testcase) (string, error) { - detacher := newDetacher(testcase) - mountPath := "/mnt/" + string(diskName) - return "", detacher.Detach(mountPath, nodeName) - }, - }, - - // Detach succeeds when DiskIsAttached fails - { - name: "Detach_Positive_CheckFails", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, diskCheckError}, - detach: detachCall{diskName, nodeName, "/dev/sda", nil}, - test: func(testcase *testcase) (string, error) { - detacher := newDetacher(testcase) - mountPath := "/mnt/" + string(diskName) - return "", detacher.Detach(mountPath, nodeName) - }, - }, - // Detach fails { - name: "Detach_Negative", - diskIsAttached: diskIsAttachedCall{diskName, nodeName, false, diskCheckError}, - detach: detachCall{diskName, nodeName, "", detachError}, + name: "Detach_Negative", + detach: detachCall{diskName, nodeName, "", detachError}, test: func(testcase *testcase) (string, error) { detacher := newDetacher(testcase) mountPath := "/mnt/" + string(diskName) @@ -298,28 +270,8 @@ func (testcase *testcase) DetachDisk(diskName aws.KubernetesVolumeID, nodeName t } func (testcase *testcase) DiskIsAttached(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) { - expected := &testcase.diskIsAttached - - if expected.diskName == "" && expected.nodeName == "" { - // testcase.diskIsAttached looks uninitialized, test did not expect to - // call DiskIsAttached - testcase.t.Errorf("Unexpected DiskIsAttached call!") - return false, errors.New("Unexpected DiskIsAttached call!") - } - - if expected.diskName != diskName { - testcase.t.Errorf("Unexpected DiskIsAttached call: expected diskName %s, got %s", expected.diskName, diskName) - return false, errors.New("Unexpected DiskIsAttached call: wrong diskName") - } - - if expected.nodeName != nodeName { - testcase.t.Errorf("Unexpected DiskIsAttached call: expected nodeName %s, got %s", expected.nodeName, nodeName) - return false, errors.New("Unexpected DiskIsAttached call: wrong nodeName") - } - - glog.V(4).Infof("DiskIsAttached call: %s, %s, returning %v, %v", diskName, nodeName, expected.isAttached, expected.ret) - - return expected.isAttached, expected.ret + // DetachDisk no longer relies on DiskIsAttached api call + return false, nil } func (testcase *testcase) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { From 69dd010a417661ae9aa60ced5416a86aa98e675c Mon Sep 17 00:00:00 2001 From: Marcin Owsiany Date: Mon, 27 Nov 2017 08:09:13 +0100 Subject: [PATCH 4/9] Unmute curl when fetching cfssl. --- hack/lib/util.sh | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hack/lib/util.sh b/hack/lib/util.sh index 69410a7fd64..1114cca8902 100755 --- a/hack/lib/util.sh +++ b/hack/lib/util.sh @@ -737,12 +737,12 @@ function kube::util::ensure-cfssl { kernel=$(uname -s) case "${kernel}" in Linux) - curl --retry 10 -s -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 - curl --retry 10 -s -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64 + curl --retry 10 -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_linux-amd64 + curl --retry 10 -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_linux-amd64 ;; Darwin) - curl --retry 10 -s -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_darwin-amd64 - curl --retry 10 -s -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_darwin-amd64 + curl --retry 10 -L -o cfssl https://pkg.cfssl.org/R1.2/cfssl_darwin-amd64 + curl --retry 10 -L -o cfssljson https://pkg.cfssl.org/R1.2/cfssljson_darwin-amd64 ;; *) echo "Unknown, unsupported platform: ${kernel}." >&2 From b5710019994985a830c4efdb96ac9fde75dcddae Mon Sep 17 00:00:00 2001 From: Avesh Agarwal Date: Wed, 15 Nov 2017 14:42:02 -0500 Subject: [PATCH 5/9] Implement resource limit priority function. This function checks if the input pod's resource limits are satisfied by the input node's allocatable resources or not. If yes, the node is assigned a score of 1, otherwise the node's score is not changed. --- pkg/features/kube_features.go | 7 + .../pkg/scheduler/algorithm/priorities/BUILD | 2 + .../algorithm/priorities/resource_limits.go | 128 +++++++++++++++ .../priorities/resource_limits_test.go | 151 ++++++++++++++++++ .../algorithmprovider/defaults/defaults.go | 4 + 5 files changed, 292 insertions(+) create mode 100644 plugin/pkg/scheduler/algorithm/priorities/resource_limits.go create mode 100644 plugin/pkg/scheduler/algorithm/priorities/resource_limits_test.go diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 1ed30a62fda..5002f5e41ba 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -206,6 +206,12 @@ const ( // alpha: v1.9 // Postpone deletion of a persistent volume claim in case it is used by a pod PVCProtection utilfeature.Feature = "PVCProtection" + + // owner: @aveshagarwal + // alpha: v1.9 + // + // Enable resource limits priority function + ResourceLimitsPriorityFunction utilfeature.Feature = "ResourceLimitsPriorityFunction" ) func init() { @@ -244,6 +250,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS CustomPodDNS: {Default: false, PreRelease: utilfeature.Alpha}, BlockVolume: {Default: false, PreRelease: utilfeature.Alpha}, PVCProtection: {Default: false, PreRelease: utilfeature.Alpha}, + ResourceLimitsPriorityFunction: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/plugin/pkg/scheduler/algorithm/priorities/BUILD b/plugin/pkg/scheduler/algorithm/priorities/BUILD index 62913a89bd4..f473a44b4c2 100644 --- a/plugin/pkg/scheduler/algorithm/priorities/BUILD +++ b/plugin/pkg/scheduler/algorithm/priorities/BUILD @@ -19,6 +19,7 @@ go_library( "node_label.go", "node_prefer_avoid_pods.go", "reduce.go", + "resource_limits.go", "selector_spreading.go", "taint_toleration.go", "test_util.go", @@ -54,6 +55,7 @@ go_test( "node_affinity_test.go", "node_label_test.go", "node_prefer_avoid_pods_test.go", + "resource_limits_test.go", "selector_spreading_test.go", "taint_toleration_test.go", ], diff --git a/plugin/pkg/scheduler/algorithm/priorities/resource_limits.go b/plugin/pkg/scheduler/algorithm/priorities/resource_limits.go new file mode 100644 index 00000000000..77ae0dca923 --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/priorities/resource_limits.go @@ -0,0 +1,128 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priorities + +import ( + "fmt" + + "k8s.io/api/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" + + "github.com/golang/glog" +) + +// ResourceLimitsPriorityMap is a priority function that increases score of input node by 1 if the node satisfies +// input pod's resource limits. In detail, this priority function works as follows: If a node does not publish its +// allocatable resources (cpu and memory both), the node score is not affected. If a pod does not specify +// its cpu and memory limits both, the node score is not affected. If one or both of cpu and memory limits +// of the pod are satisfied, the node is assigned a score of 1. +// Rationale of choosing the lowest score of 1 is that this is mainly selected to break ties between nodes that have +// same scores assigned by one of least and most requested priority functions. +func ResourceLimitsPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (schedulerapi.HostPriority, error) { + node := nodeInfo.Node() + if node == nil { + return schedulerapi.HostPriority{}, fmt.Errorf("node not found") + } + + allocatableResources := nodeInfo.AllocatableResource() + + // compute pod limits + podLimits := getResourceLimits(pod) + + cpuScore := computeScore(podLimits.MilliCPU, allocatableResources.MilliCPU) + memScore := computeScore(podLimits.Memory, allocatableResources.Memory) + + score := int(0) + if cpuScore == 1 || memScore == 1 { + score = 1 + } + + if glog.V(10) { + // We explicitly don't do glog.V(10).Infof() to avoid computing all the parameters if this is + // not logged. There is visible performance gain from it. + glog.Infof( + "%v -> %v: Resource Limits Priority, allocatable %d millicores %d memory bytes, pod limits %d millicores %d memory bytes, score %d", + pod.Name, node.Name, + allocatableResources.MilliCPU, allocatableResources.Memory, + podLimits.MilliCPU, podLimits.Memory, + score, + ) + } + + return schedulerapi.HostPriority{ + Host: node.Name, + Score: score, + }, nil +} + +// computeScore return 1 if limit value is less than or equal to allocable +// value, otherwise it returns 0. +func computeScore(limit, allocatable int64) int64 { + if limit != 0 && allocatable != 0 && limit <= allocatable { + return 1 + } + return 0 +} + +// getResourceLimits computes resource limits for input pod. +// The reason to create this new function is to be consistent with other +// priority functions because most or perhaps all priority functions work +// with schedulercache.Resource. +// TODO: cache it as part of metadata passed to priority functions. +func getResourceLimits(pod *v1.Pod) *schedulercache.Resource { + result := &schedulercache.Resource{} + for _, container := range pod.Spec.Containers { + result.Add(container.Resources.Limits) + } + + // take max_resource(sum_pod, any_init_container) + for _, container := range pod.Spec.InitContainers { + for rName, rQuantity := range container.Resources.Limits { + switch rName { + case v1.ResourceMemory: + if mem := rQuantity.Value(); mem > result.Memory { + result.Memory = mem + } + case v1.ResourceCPU: + if cpu := rQuantity.MilliValue(); cpu > result.MilliCPU { + result.MilliCPU = cpu + } + // keeping these resources though score computation in other priority functions and in this + // are only computed based on cpu and memory only. + case v1.ResourceEphemeralStorage: + if ephemeralStorage := rQuantity.Value(); ephemeralStorage > result.EphemeralStorage { + result.EphemeralStorage = ephemeralStorage + } + case v1.ResourceNvidiaGPU: + if gpu := rQuantity.Value(); gpu > result.NvidiaGPU { + result.NvidiaGPU = gpu + } + default: + if v1helper.IsScalarResourceName(rName) { + value := rQuantity.Value() + if value > result.ScalarResources[rName] { + result.SetScalar(rName, value) + } + } + } + } + } + + return result +} diff --git a/plugin/pkg/scheduler/algorithm/priorities/resource_limits_test.go b/plugin/pkg/scheduler/algorithm/priorities/resource_limits_test.go new file mode 100644 index 00000000000..0a48cc73308 --- /dev/null +++ b/plugin/pkg/scheduler/algorithm/priorities/resource_limits_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package priorities + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + //metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" + "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" +) + +func TestResourceLimistPriority(t *testing.T) { + noResources := v1.PodSpec{ + Containers: []v1.Container{}, + } + + cpuOnly := v1.PodSpec{ + NodeName: "machine1", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("0"), + }, + }, + }, + }, + } + + memOnly := v1.PodSpec{ + NodeName: "machine2", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + v1.ResourceMemory: resource.MustParse("2000"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0"), + v1.ResourceMemory: resource.MustParse("3000"), + }, + }, + }, + }, + } + + cpuAndMemory := v1.PodSpec{ + NodeName: "machine2", + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1000m"), + v1.ResourceMemory: resource.MustParse("2000"), + }, + }, + }, + { + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("2000m"), + v1.ResourceMemory: resource.MustParse("3000"), + }, + }, + }, + }, + } + + tests := []struct { + // input pod + pod *v1.Pod + nodes []*v1.Node + expectedList schedulerapi.HostPriorityList + test string + }{ + { + pod: &v1.Pod{Spec: noResources}, + nodes: []*v1.Node{makeNode("machine1", 4000, 10000), makeNode("machine2", 4000, 0), makeNode("machine3", 0, 10000), makeNode("machine4", 0, 0)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 0}, {Host: "machine3", Score: 0}, {Host: "machine4", Score: 0}}, + test: "pod does not specify its resource limits", + }, + { + pod: &v1.Pod{Spec: cpuOnly}, + nodes: []*v1.Node{makeNode("machine1", 3000, 10000), makeNode("machine2", 2000, 10000)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 0}}, + test: "pod only specifies cpu limits", + }, + { + pod: &v1.Pod{Spec: memOnly}, + nodes: []*v1.Node{makeNode("machine1", 4000, 4000), makeNode("machine2", 5000, 10000)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}, {Host: "machine2", Score: 1}}, + test: "pod only specifies mem limits", + }, + { + pod: &v1.Pod{Spec: cpuAndMemory}, + nodes: []*v1.Node{makeNode("machine1", 4000, 4000), makeNode("machine2", 5000, 10000)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 1}, {Host: "machine2", Score: 1}}, + test: "pod specifies both cpu and mem limits", + }, + { + pod: &v1.Pod{Spec: cpuAndMemory}, + nodes: []*v1.Node{makeNode("machine1", 0, 0)}, + expectedList: []schedulerapi.HostPriority{{Host: "machine1", Score: 0}}, + test: "node does not advertise its allocatables", + }, + } + + for _, test := range tests { + nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(nil, test.nodes) + list, err := priorityFunction(ResourceLimitsPriorityMap, nil, nil)(test.pod, nodeNameToInfo, test.nodes) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(test.expectedList, list) { + t.Errorf("%s: expected %#v, got %#v", test.test, test.expectedList, list) + } + } + +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 5bb5f136192..1b7b34d96bd 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -106,6 +106,10 @@ func init() { factory.RegisterPriorityFunction2("ImageLocalityPriority", priorities.ImageLocalityPriorityMap, nil, 1) // Optional, cluster-autoscaler friendly priority function - give used nodes higher priority. factory.RegisterPriorityFunction2("MostRequestedPriority", priorities.MostRequestedPriorityMap, nil, 1) + // Prioritizes nodes that satisfy pod's resource limits + if utilfeature.DefaultFeatureGate.Enabled(features.ResourceLimitsPriorityFunction) { + factory.RegisterPriorityFunction2("ResourceLimitsPriority", priorities.ResourceLimitsPriorityMap, nil, 1) + } } func defaultPredicates() sets.String { From b5c1eff3757a94492b006fe792c5cc4ab5213bc2 Mon Sep 17 00:00:00 2001 From: Shyam Jeedigunta Date: Tue, 28 Nov 2017 11:55:44 +0100 Subject: [PATCH 6/9] Allow choosing min CPU architecture for master VM on gce --- cluster/gce/config-default.sh | 1 + cluster/gce/config-test.sh | 1 + cluster/gce/container-linux/master-helper.sh | 6 ++++++ cluster/gce/gci/master-helper.sh | 6 ++++++ 4 files changed, 14 insertions(+) diff --git a/cluster/gce/config-default.sh b/cluster/gce/config-default.sh index 24b4c389a99..78aa9740b64 100755 --- a/cluster/gce/config-default.sh +++ b/cluster/gce/config-default.sh @@ -30,6 +30,7 @@ REGIONAL_KUBE_ADDONS=${REGIONAL_KUBE_ADDONS:-true} NODE_SIZE=${NODE_SIZE:-n1-standard-2} NUM_NODES=${NUM_NODES:-3} MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)} +MASTER_MIN_CPU_ARCHITECTURE=${MASTER_MIN_CPU_ARCHITECTURE:-} # To allow choosing better architectures. MASTER_DISK_TYPE=pd-ssd MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)} MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)} diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index f9f223e1e9c..12a45ddad8d 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -30,6 +30,7 @@ REGIONAL_KUBE_ADDONS=${REGIONAL_KUBE_ADDONS:-true} NODE_SIZE=${NODE_SIZE:-n1-standard-2} NUM_NODES=${NUM_NODES:-3} MASTER_SIZE=${MASTER_SIZE:-n1-standard-$(get-master-size)} +MASTER_MIN_CPU_ARCHITECTURE=${MASTER_MIN_CPU_ARCHITECTURE:-} # To allow choosing better architectures. MASTER_DISK_TYPE=pd-ssd MASTER_DISK_SIZE=${MASTER_DISK_SIZE:-$(get-master-disk-size)} MASTER_ROOT_DISK_SIZE=${MASTER_ROOT_DISK_SIZE:-$(get-master-root-disk-size)} diff --git a/cluster/gce/container-linux/master-helper.sh b/cluster/gce/container-linux/master-helper.sh index e727349f2c7..c9b74613b92 100755 --- a/cluster/gce/container-linux/master-helper.sh +++ b/cluster/gce/container-linux/master-helper.sh @@ -79,6 +79,11 @@ function create-master-instance-internal() { preemptible_master="--preemptible --maintenance-policy TERMINATE" fi + local min_cpu_platform="" + if [[ -n "${MASTER_MIN_CPU_ARCHITECTURE:-}" ]]; then + min_cpu_platform="--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}" + fi + local network=$(make-gcloud-network-argument \ "${NETWORK_PROJECT}" "${REGION}" "${NETWORK}" "${SUBNETWORK:-}" \ "${address:-}" "${ENABLE_IP_ALIASES:-}" "${IP_ALIAS_SIZE:-}") @@ -107,6 +112,7 @@ function create-master-instance-internal() { --disk "${disk}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ ${preemptible_master} \ + ${min_cpu_platform} \ ${network} 2>&1); then echo "${result}" >&2 return 0 diff --git a/cluster/gce/gci/master-helper.sh b/cluster/gce/gci/master-helper.sh index def861d20bb..e08484317c9 100755 --- a/cluster/gce/gci/master-helper.sh +++ b/cluster/gce/gci/master-helper.sh @@ -94,6 +94,11 @@ function create-master-instance-internal() { preemptible_master="--preemptible --maintenance-policy TERMINATE" fi + local min_cpu_platform="" + if [[ -n "${MASTER_MIN_CPU_ARCHITECTURE:-}" ]]; then + min_cpu_platform="--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}" + fi + local network=$(make-gcloud-network-argument \ "${NETWORK_PROJECT}" "${REGION}" "${NETWORK}" "${SUBNETWORK:-}" \ "${address:-}" "${ENABLE_IP_ALIASES:-}" "${IP_ALIAS_SIZE:-}") @@ -127,6 +132,7 @@ function create-master-instance-internal() { --disk "${disk}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ ${preemptible_master} \ + ${min_cpu_platform} \ ${network} 2>&1); then echo "${result}" >&2 return 0 From 63f7836d39eac2ddf903208b6d49575f701ee11e Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 28 Nov 2017 15:32:18 -0800 Subject: [PATCH 7/9] mock container networking and fix filtering bug --- .../dockershim/libdocker/fake_client.go | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/dockershim/libdocker/fake_client.go b/pkg/kubelet/dockershim/libdocker/fake_client.go index 529c9141991..4474fa4a5b6 100644 --- a/pkg/kubelet/dockershim/libdocker/fake_client.go +++ b/pkg/kubelet/dockershim/libdocker/fake_client.go @@ -410,7 +410,7 @@ func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptio var filtered []dockertypes.Container for _, container := range containerList { for _, statusFilter := range statusFilters { - if container.Status == statusFilter { + if toDockerContainerStatus(container.Status) == statusFilter { filtered = append(filtered, container) break } @@ -443,6 +443,19 @@ func (f *FakeDockerClient) ListContainers(options dockertypes.ContainerListOptio return containerList, err } +func toDockerContainerStatus(state string) string { + switch { + case strings.HasPrefix(state, StatusCreatedPrefix): + return "created" + case strings.HasPrefix(state, StatusRunningPrefix): + return "running" + case strings.HasPrefix(state, StatusExitedPrefix): + return "exited" + default: + return "unknown" + } +} + // InspectContainer is a test-spy implementation of Interface.InspectContainer. // It adds an entry "inspect" to the internal method call record. func (f *FakeDockerClient) InspectContainer(id string) (*dockertypes.ContainerJSON, error) { @@ -565,6 +578,18 @@ func (f *FakeDockerClient) StartContainer(id string) error { } f.appendContainerTrace("Started", id) container, ok := f.ContainerMap[id] + if container.HostConfig.NetworkMode.IsContainer() { + hostContainerID := container.HostConfig.NetworkMode.ConnectedContainer() + found := false + for _, container := range f.RunningContainerList { + if container.ID == hostContainerID { + found = true + } + } + if !found { + return fmt.Errorf("failed to start container \"%s\": Error response from daemon: cannot join network of a non running container: %s", id, hostContainerID) + } + } timestamp := f.Clock.Now() if !ok { container = convertFakeContainer(&FakeContainer{ID: id, Name: id, CreatedAt: timestamp}) From 04c45e10db29aff461ba9690ad3fe492e6f80119 Mon Sep 17 00:00:00 2001 From: Josh Horwitz Date: Tue, 28 Nov 2017 18:59:21 -0500 Subject: [PATCH 8/9] Revert "Merge pull request #55336 from oracle/for/upstream/master/53462" This reverts commit ccb15fb498352cad3f582cce0ade4e99efe0f82f, reversing changes made to 49040376458b263c67d22db76461cc9069e22b0d. --- pkg/controller/service/BUILD | 2 - pkg/controller/service/service_controller.go | 72 ++++---- .../service/service_controller_test.go | 161 ++++-------------- 3 files changed, 72 insertions(+), 163 deletions(-) diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index c84ef3bd367..cfb191f9ca0 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -50,8 +50,6 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", - "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", - "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index b3cba8a9312..38c6b99da2c 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -89,6 +89,7 @@ type serviceCache struct { type ServiceController struct { cloud cloudprovider.Interface knownHosts []*v1.Node + servicesToUpdate []*v1.Service kubeClient clientset.Interface clusterName string balancer cloudprovider.LoadBalancer @@ -243,20 +244,6 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s } } } - - if cachedService.state != nil { - if !s.needsUpdate(cachedService.state, service) { - // The service does not require an update which means it was placed on the work queue - // by the node sync loop and indicates that the hosts need to be updated. - err := s.updateLoadBalancerHosts(service) - if err != nil { - return err, cachedService.nextRetryDelay() - } - cachedService.resetRetryDelay() - return nil, doNotRetry - } - } - // cache the service, we need the info for service deletion cachedService.state = service err, retry := s.createLoadBalancerIfNeeded(key, service) @@ -451,8 +438,6 @@ func (s *serviceCache) delete(serviceName string) { delete(s.serviceMap, serviceName) } -// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update. -// This method does not and should not check if the hosts have changed. func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) { return false @@ -651,45 +636,62 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate { } } -// nodeSyncLoop handles adding all existing cached services to the work queue -// to be reprocessed so that they can have their hosts updated, if any -// host changes have occurred since the last sync loop. +// nodeSyncLoop handles updating the hosts pointed to by all load +// balancers whenever the set of nodes in the cluster changes. func (s *ServiceController) nodeSyncLoop() { newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil { glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) return } - if nodeSlicesEqualForLB(newHosts, s.knownHosts) { - // Nothing to do since the hosts have not changed. + // The set of nodes in the cluster hasn't changed, but we can retry + // updating any services that we failed to update last time around. + s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) return } - glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts)) + glog.Infof("Detected change in list of current cluster nodes. New node set: %v", + nodeNames(newHosts)) - for _, svc := range s.cache.allServices() { - s.enqueueService(svc) - } + // Try updating all services, and save the ones that fail to try again next + // round. + s.servicesToUpdate = s.cache.allServices() + numServices := len(s.servicesToUpdate) + s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) + glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", + numServices-len(s.servicesToUpdate), numServices) - // Update the known hosts so we can check next sync loop for changes. s.knownHosts = newHosts } -// Updates the load balancer of the service with updated nodes ONLY. -// This method will not trigger the cloud provider to create or full update a load balancer. -func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error { +// updateLoadBalancerHosts updates all existing load balancers so that +// they will match the list of hosts provided. +// Returns the list of services that couldn't be updated. +func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { + for _, service := range services { + func() { + if service == nil { + return + } + if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { + glog.Errorf("External error while updating load balancer: %v.", err) + servicesToRetry = append(servicesToRetry, service) + } + }() + } + return servicesToRetry +} + +// Updates the load balancer of a service, assuming we hold the mutex +// associated with the service. +func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { if !wantsLoadBalancer(service) { return nil } - hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) - if err != nil { - return err - } - // This operation doesn't normally take very long (and happens pretty often), so we only record the final event - err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) + err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) if err == nil { // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(hosts) == 0 { diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index ce99574936e..0c4990adb1a 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -19,7 +19,6 @@ package service import ( "fmt" "reflect" - "sort" "testing" "time" @@ -28,8 +27,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/testapi" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" @@ -177,45 +174,23 @@ func TestCreateExternalLoadBalancer(t *testing.T) { } } -// newLoadBalancerNode returns a node that passes the predicate check for a -// node to receive load balancer traffic. -func newLoadBalancerNode(name string) *v1.Node { - return &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - Spec: v1.NodeSpec{ - Unschedulable: false, - }, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - {Type: v1.NodeReady, Status: v1.ConditionTrue}, - }, - }, - } -} - -func sortNodesByName(nodes []*v1.Node) { - sort.Slice(nodes, func(i, j int) bool { - return nodes[i].Name < nodes[j].Name - }) -} - // TODO: Finish converting and update comments func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { - nodes := []*v1.Node{ - newLoadBalancerNode("node0"), - newLoadBalancerNode("node1"), - newLoadBalancerNode("node73"), + {ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, } - sortNodesByName(nodes) - - table := map[string]struct { + table := []struct { services []*v1.Service expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall }{ - "update no load balancer": { + { + // No services present: no calls should be made. + services: []*v1.Service{}, + expectedUpdateCalls: nil, + }, + { // Services do not have external load balancers: no calls should be made. services: []*v1.Service{ newService("s0", "111", v1.ServiceTypeClusterIP), @@ -223,7 +198,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { }, expectedUpdateCalls: nil, }, - "update 1 load balancer": { + { // Services does have an external load balancer: one call should be made. services: []*v1.Service{ newService("s0", "333", v1.ServiceTypeLoadBalancer), @@ -232,7 +207,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - "update 3 load balancers": { + { // Three services have an external load balancer: three calls. services: []*v1.Service{ newService("s0", "444", v1.ServiceTypeLoadBalancer), @@ -245,7 +220,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - "update 2 load balancers": { + { // Two services have an external load balancer and two don't: two calls. services: []*v1.Service{ newService("s0", "777", v1.ServiceTypeNodePort), @@ -258,44 +233,30 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, + { + // One service has an external load balancer and one is nil: one call. + services: []*v1.Service{ + newService("s0", "234", v1.ServiceTypeLoadBalancer), + nil, + }, + expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ + {Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, + }, + }, } + for _, item := range table { + controller, cloud, _ := newController() - for name, item := range table { - t.Run(name, func(t *testing.T) { - controller, cloud, _ := newController() - - var services []*v1.Service - for _, service := range item.services { - services = append(services, service) - } - nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - for _, node := range nodes { - nodeIndexer.Add(node) - } - controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) - - for _, service := range services { - if err := controller.updateLoadBalancerHosts(service); err != nil { - t.Errorf("unexpected error: %v", err) - } - } - - if len(item.expectedUpdateCalls) != len(cloud.UpdateCalls) { - t.Errorf("expected %d update calls but only got %d", len(item.expectedUpdateCalls), len(cloud.UpdateCalls)) - } - - for i, expectedCall := range item.expectedUpdateCalls { - actualCall := cloud.UpdateCalls[i] - if !reflect.DeepEqual(expectedCall.Service, actualCall.Service) { - t.Errorf("expected update call to contain service %+v, got %+v", expectedCall.Service, actualCall.Service) - } - - sortNodesByName(actualCall.Hosts) - if !reflect.DeepEqual(expectedCall.Hosts, actualCall.Hosts) { - t.Errorf("expected update call to contain hosts %+v, got %+v", expectedCall.Hosts, actualCall.Hosts) - } - } - }) + var services []*v1.Service + for _, service := range item.services { + services = append(services, service) + } + if err := controller.updateLoadBalancerHosts(services, nodes); err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { + t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) + } } } @@ -350,13 +311,6 @@ func TestProcessServiceUpdate(t *testing.T) { var controller *ServiceController var cloud *fakecloud.FakeCloud - nodes := []*v1.Node{ - newLoadBalancerNode("node0"), - newLoadBalancerNode("node1"), - newLoadBalancerNode("node73"), - } - sortNodesByName(nodes) - //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" newLBIP := "192.168.1.11" @@ -390,51 +344,6 @@ func TestProcessServiceUpdate(t *testing.T) { return nil }, }, - { - testName: "If updating hosts only", - key: "default/sync-test-name", - svc: newService("sync-test-name", types.UID("sync-test-uid"), v1.ServiceTypeLoadBalancer), - updateFn: func(svc *v1.Service) *v1.Service { - keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName() - cachedServiceTest := controller.cache.getOrCreate(keyExpected) - cachedServiceTest.state = svc - controller.cache.set(keyExpected, cachedServiceTest) - - // Set the nodes for the cloud's UpdateLoadBalancer call to use. - nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) - for _, node := range nodes { - nodeIndexer.Add(node) - } - controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) - - // This should trigger the needsUpdate false check since the service equals the cached service - return svc - }, - expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error { - if err != nil { - return err - } - if retryDuration != doNotRetry { - return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) - } - - if len(cloud.UpdateCalls) != 1 { - return fmt.Errorf("expected one update host call but only got %+v", cloud.UpdateCalls) - } - - actualCall := cloud.UpdateCalls[0] - if !reflect.DeepEqual(svc, actualCall.Service) { - return fmt.Errorf("expected update call to contain service %+v, got %+v", svc, actualCall.Service) - } - - sortNodesByName(actualCall.Hosts) - if !reflect.DeepEqual(nodes, actualCall.Hosts) { - return fmt.Errorf("expected update call to contain hosts %+v, got %+v", nodes, actualCall.Hosts) - } - - return nil - }, - }, { testName: "If Updating Loadbalancer IP", key: "default/sync-test-name", From 46f17fed8abcf9e5f575f0049ea61933b9edb986 Mon Sep 17 00:00:00 2001 From: Shyam Jeedigunta Date: Wed, 29 Nov 2017 10:31:41 +0100 Subject: [PATCH 9/9] Fix --min-cpu-platform argument to gcloud in kube-up --- cluster/gce/container-linux/master-helper.sh | 7 +------ cluster/gce/gci/master-helper.sh | 7 +------ 2 files changed, 2 insertions(+), 12 deletions(-) diff --git a/cluster/gce/container-linux/master-helper.sh b/cluster/gce/container-linux/master-helper.sh index c9b74613b92..3cd3ee3a3e5 100755 --- a/cluster/gce/container-linux/master-helper.sh +++ b/cluster/gce/container-linux/master-helper.sh @@ -79,11 +79,6 @@ function create-master-instance-internal() { preemptible_master="--preemptible --maintenance-policy TERMINATE" fi - local min_cpu_platform="" - if [[ -n "${MASTER_MIN_CPU_ARCHITECTURE:-}" ]]; then - min_cpu_platform="--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}" - fi - local network=$(make-gcloud-network-argument \ "${NETWORK_PROJECT}" "${REGION}" "${NETWORK}" "${SUBNETWORK:-}" \ "${address:-}" "${ENABLE_IP_ALIASES:-}" "${IP_ALIAS_SIZE:-}") @@ -111,8 +106,8 @@ function create-master-instance-internal() { --metadata-from-file "${metadata}" \ --disk "${disk}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ + ${MASTER_MIN_CPU_ARCHITECTURE:+"--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}"} \ ${preemptible_master} \ - ${min_cpu_platform} \ ${network} 2>&1); then echo "${result}" >&2 return 0 diff --git a/cluster/gce/gci/master-helper.sh b/cluster/gce/gci/master-helper.sh index e08484317c9..e6fac224cf9 100755 --- a/cluster/gce/gci/master-helper.sh +++ b/cluster/gce/gci/master-helper.sh @@ -94,11 +94,6 @@ function create-master-instance-internal() { preemptible_master="--preemptible --maintenance-policy TERMINATE" fi - local min_cpu_platform="" - if [[ -n "${MASTER_MIN_CPU_ARCHITECTURE:-}" ]]; then - min_cpu_platform="--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}" - fi - local network=$(make-gcloud-network-argument \ "${NETWORK_PROJECT}" "${REGION}" "${NETWORK}" "${SUBNETWORK:-}" \ "${address:-}" "${ENABLE_IP_ALIASES:-}" "${IP_ALIAS_SIZE:-}") @@ -131,8 +126,8 @@ function create-master-instance-internal() { --metadata-from-file "${metadata}" \ --disk "${disk}" \ --boot-disk-size "${MASTER_ROOT_DISK_SIZE}" \ + ${MASTER_MIN_CPU_ARCHITECTURE:+"--min-cpu-platform=${MASTER_MIN_CPU_ARCHITECTURE}"} \ ${preemptible_master} \ - ${min_cpu_platform} \ ${network} 2>&1); then echo "${result}" >&2 return 0