Merge pull request #45330 from NickrenREN/openstack-backoff

Automatic merge from submit-queue (batch tested with PRs 45018, 45330)

Add exponential backoff to openstack loadbalancer functions

Using  exponential backoff to lower openstack load and reduce API call throttling


**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-08 23:00:38 -07:00 committed by GitHub
commit 20fa30e4b5
3 changed files with 97 additions and 84 deletions

View File

@ -56,6 +56,7 @@ go_library(
"//vendor/gopkg.in/gcfg.v1:go_default_library", "//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/util/cert:go_default_library", "//vendor/k8s.io/client-go/util/cert:go_default_library",
], ],
) )
@ -78,6 +79,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
], ],
) )

View File

@ -39,6 +39,7 @@ import (
neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports" neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
"github.com/gophercloud/gophercloud/pagination" "github.com/gophercloud/gophercloud/pagination"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/v1/service" "k8s.io/kubernetes/pkg/api/v1/service"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
@ -46,8 +47,26 @@ import (
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use, // Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state. // this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
const loadbalancerActiveTimeoutSeconds = 120 const (
const loadbalancerDeleteTimeoutSeconds = 30 // loadbalancerActive* is configuration of exponential backoff for
// going into ACTIVE loadbalancer provisioning status. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
// it will time out after 128s, which roughly corresponds to 120s
loadbalancerActiveInitDealy = 1 * time.Second
loadbalancerActiveFactor = 1.2
loadbalancerActiveSteps = 19
// loadbalancerDelete* is configuration of exponential backoff for
// waiting for delete operation to complete. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
// it will time out after 32s, which roughly corresponds to 30s
loadbalancerDeleteInitDealy = 1 * time.Second
loadbalancerDeleteFactor = 1.2
loadbalancerDeleteSteps = 13
activeStatus = "ACTIVE"
errorStatus = "ERROR"
)
// LoadBalancer implementation for LBaaS v1 // LoadBalancer implementation for LBaaS v1
type LbaasV1 struct { type LbaasV1 struct {
@ -337,44 +356,6 @@ func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2pools
return members, nil return members, nil
} }
// Each pool has exactly one or zero monitors. ListOpts does not seem to filter anything.
func getMonitorByPoolID(client *gophercloud.ServiceClient, id string) (*v2monitors.Monitor, error) {
var monitorList []v2monitors.Monitor
err := v2monitors.List(client, v2monitors.ListOpts{PoolID: id}).EachPage(func(page pagination.Page) (bool, error) {
monitorsList, err := v2monitors.ExtractMonitors(page)
if err != nil {
return false, err
}
for _, monitor := range monitorsList {
// bugfix, filter by poolid
for _, pool := range monitor.Pools {
if pool.ID == id {
monitorList = append(monitorList, monitor)
}
}
}
if len(monitorList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(monitorList) == 0 {
return nil, ErrNotFound
} else if len(monitorList) > 1 {
return nil, ErrMultipleResults
}
return &monitorList[0], nil
}
// Check if a member exists for node // Check if a member exists for node
func memberExists(members []v2pools.Member, addr string, port int) bool { func memberExists(members []v2pools.Member, addr string, port int) bool {
for _, member := range members { for _, member := range members {
@ -436,45 +417,59 @@ func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpt
} }
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) { func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
start := time.Now().Second() backoff := wait.Backoff{
for { Duration: loadbalancerActiveInitDealy,
Factor: loadbalancerActiveFactor,
Steps: loadbalancerActiveSteps,
}
var provisioningStatus string
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract() loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil { if err != nil {
return "", err return false, err
} }
if loadbalancer.ProvisioningStatus == "ACTIVE" { provisioningStatus = loadbalancer.ProvisioningStatus
return "ACTIVE", nil if loadbalancer.ProvisioningStatus == activeStatus {
} else if loadbalancer.ProvisioningStatus == "ERROR" { return true, nil
return "ERROR", fmt.Errorf("Loadbalancer has gone into ERROR state") } else if loadbalancer.ProvisioningStatus == errorStatus {
return true, fmt.Errorf("Loadbalancer has gone into ERROR state")
} else {
return false, nil
} }
time.Sleep(1 * time.Second) })
if time.Now().Second()-start >= loadbalancerActiveTimeoutSeconds { if err == wait.ErrWaitTimeout {
return loadbalancer.ProvisioningStatus, fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time") err = fmt.Errorf("Loadbalancer failed to go into ACTIVE provisioning status within alloted time")
}
} }
return provisioningStatus, err
} }
func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error { func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
start := time.Now().Second() backoff := wait.Backoff{
for { Duration: loadbalancerDeleteInitDealy,
Factor: loadbalancerDeleteFactor,
Steps: loadbalancerDeleteSteps,
}
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
_, err := loadbalancers.Get(client, loadbalancerID).Extract() _, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil { if err != nil {
if err == ErrNotFound { if err == ErrNotFound {
return nil return true, nil
} else { } else {
return false, err
}
} else {
return false, nil
}
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("Loadbalancer failed to delete within the alloted time")
}
return err return err
}
}
time.Sleep(1 * time.Second)
if time.Now().Second()-start >= loadbalancerDeleteTimeoutSeconds {
return fmt.Errorf("Loadbalancer failed to delete within the alloted time")
}
}
} }
func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol { func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol {

View File

@ -30,39 +30,55 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
) )
const volumeAvailableStatus = "available" const (
const volumeInUseStatus = "in-use" volumeAvailableStatus = "available"
const volumeCreateTimeoutSeconds = 30 volumeInUseStatus = "in-use"
const testClusterName = "testCluster" testClusterName = "testCluster"
func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string, timeoutSeconds int) { volumeStatusTimeoutSeconds = 30
timeout := timeoutSeconds // volumeStatus* is configuration of exponential backoff for
start := time.Now().Second() // waiting for specified volume status. Starting with 1
for { // seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
time.Sleep(1 * time.Second) // it will time out after 32s, which roughly corresponds to 30s
volumeStatusInitDealy = 1 * time.Second
volumeStatusFactor = 1.2
volumeStatusSteps = 13
)
if timeout >= 0 && time.Now().Second()-start >= timeout { func WaitForVolumeStatus(t *testing.T, os *OpenStack, volumeName string, status string) {
t.Logf("Volume (%s) status did not change to %s after %v seconds\n", backoff := wait.Backoff{
volumeName, Duration: volumeStatusInitDealy,
status, Factor: volumeStatusFactor,
timeout) Steps: volumeStatusSteps,
return
} }
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
getVol, err := os.getVolume(volumeName) getVol, err := os.getVolume(volumeName)
if err != nil { if err != nil {
t.Fatalf("Cannot get existing Cinder volume (%s): %v", volumeName, err) return false, err
} }
if getVol.Status == status { if getVol.Status == status {
t.Logf("Volume (%s) status changed to %s after %v seconds\n", t.Logf("Volume (%s) status changed to %s after %v seconds\n",
volumeName, volumeName,
status, status,
timeout) volumeStatusTimeoutSeconds)
return true, nil
} else {
return false, nil
}
})
if err == wait.ErrWaitTimeout {
t.Logf("Volume (%s) status did not change to %s after %v seconds\n",
volumeName,
status,
volumeStatusTimeoutSeconds)
return return
} }
if err != nil {
t.Fatalf("Cannot get existing Cinder volume (%s): %v", volumeName, err)
} }
} }
@ -360,7 +376,7 @@ func TestVolumes(t *testing.T) {
} }
t.Logf("Volume (%s) created\n", vol) t.Logf("Volume (%s) created\n", vol)
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds) WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)
diskId, err := os.AttachDisk(os.localInstanceID, vol) diskId, err := os.AttachDisk(os.localInstanceID, vol)
if err != nil { if err != nil {
@ -368,7 +384,7 @@ func TestVolumes(t *testing.T) {
} }
t.Logf("Volume (%s) attached, disk ID: %s\n", vol, diskId) t.Logf("Volume (%s) attached, disk ID: %s\n", vol, diskId)
WaitForVolumeStatus(t, os, vol, volumeInUseStatus, volumeCreateTimeoutSeconds) WaitForVolumeStatus(t, os, vol, volumeInUseStatus)
devicePath := os.GetDevicePath(diskId) devicePath := os.GetDevicePath(diskId)
if !strings.HasPrefix(devicePath, "/dev/disk/by-id/") { if !strings.HasPrefix(devicePath, "/dev/disk/by-id/") {
@ -382,7 +398,7 @@ func TestVolumes(t *testing.T) {
} }
t.Logf("Volume (%s) detached\n", vol) t.Logf("Volume (%s) detached\n", vol)
WaitForVolumeStatus(t, os, vol, volumeAvailableStatus, volumeCreateTimeoutSeconds) WaitForVolumeStatus(t, os, vol, volumeAvailableStatus)
err = os.DeleteVolume(vol) err = os.DeleteVolume(vol)
if err != nil { if err != nil {