Refactored disk cloudprovider methods to use generated client; Refactored gce_disks unit tests; Removed unused gce_op.go and associated unit tests.

This commit is contained in:
Cheng Xing 2018-05-23 15:34:56 -07:00
parent 5da925ad4f
commit d33c1e3ba8
8 changed files with 208 additions and 519 deletions

View File

@ -31,7 +31,6 @@ go_library(
"gce_loadbalancer_internal.go",
"gce_loadbalancer_naming.go",
"gce_networkendpointgroup.go",
"gce_op.go",
"gce_routes.go",
"gce_securitypolicy.go",
"gce_targetpool.go",

View File

@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"constants.go",
"context.go",
"doc.go",
"gce_projects.go",
"gen.go",

View File

@ -0,0 +1,31 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"time"
)
const (
defaultCallTimeout = 1 * time.Hour
)
// ContextWithCallTimeout returns a context with a default timeout, used for generated client calls.
func ContextWithCallTimeout() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), defaultCallTimeout)
}

View File

@ -160,22 +160,20 @@ var AllServices = []*ServiceInfo{
Resource: "disks",
keyType: Zonal,
serviceType: reflect.TypeOf(&ga.DisksService{}),
},
{
Object: "Disk",
Service: "Disks",
Resource: "disks",
version: VersionAlpha,
keyType: Zonal,
serviceType: reflect.TypeOf(&alpha.DisksService{}),
additionalMethods: []string{
"Resize",
},
},
{
Object: "Disk",
Service: "RegionDisks",
Resource: "disks",
version: VersionAlpha,
version: VersionBeta,
keyType: Regional,
serviceType: reflect.TypeOf(&alpha.DisksService{}),
serviceType: reflect.TypeOf(&beta.RegionDisksService{}),
additionalMethods: []string{
"Resize",
},
},
{
Object: "Firewall",

View File

@ -38,6 +38,8 @@ import (
compute "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
"k8s.io/kubernetes/pkg/features"
)
@ -65,7 +67,7 @@ type diskServiceManager interface {
sizeGb int64,
tagsStr string,
diskType string,
zone string) (gceObject, error)
zone string) error
// Creates a new regional persistent disk on GCE with the given disk spec.
CreateRegionalDiskOnCloudProvider(
@ -73,41 +75,35 @@ type diskServiceManager interface {
sizeGb int64,
tagsStr string,
diskType string,
zones sets.String) (gceObject, error)
zones sets.String) error
// Deletes the persistent disk from GCE with the given diskName.
DeleteDiskOnCloudProvider(zone string, disk string) (gceObject, error)
DeleteDiskOnCloudProvider(zone string, disk string) error
// Deletes the regional persistent disk from GCE with the given diskName.
DeleteRegionalDiskOnCloudProvider(diskName string) (gceObject, error)
DeleteRegionalDiskOnCloudProvider(diskName string) error
// Attach a persistent disk on GCE with the given disk spec to the specified instance.
AttachDiskOnCloudProvider(
disk *GCEDisk,
readWrite string,
instanceZone string,
instanceName string) (gceObject, error)
instanceName string) error
// Detach a persistent disk on GCE with the given disk spec from the specified instance.
DetachDiskOnCloudProvider(
instanceZone string,
instanceName string,
devicePath string) (gceObject, error)
devicePath string) error
ResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64, zone string) (gceObject, error)
RegionalResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64) (gceObject, error)
ResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64, zone string) error
RegionalResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64) error
// Gets the persistent disk from GCE with the given diskName.
GetDiskFromCloudProvider(zone string, diskName string) (*GCEDisk, error)
// Gets the regional persistent disk from GCE with the given diskName.
GetRegionalDiskFromCloudProvider(diskName string) (*GCEDisk, error)
// Waits until GCE reports the given operation in the given zone as done.
WaitForZoneOp(op gceObject, zone string, mc *metricContext) error
// Waits until GCE reports the given operation in the given region is done.
WaitForRegionalOp(op gceObject, mc *metricContext) error
}
type gceServiceManager struct {
@ -121,11 +117,11 @@ func (manager *gceServiceManager) CreateDiskOnCloudProvider(
sizeGb int64,
tagsStr string,
diskType string,
zone string) (gceObject, error) {
zone string) error {
diskTypeURI, err := manager.getDiskTypeURI(
manager.gce.region /* diskRegion */, singleZone{zone}, diskType, false /* useBetaAPI */)
if err != nil {
return nil, err
return err
}
diskToCreateV1 := &compute.Disk{
@ -134,8 +130,10 @@ func (manager *gceServiceManager) CreateDiskOnCloudProvider(
Description: tagsStr,
Type: diskTypeURI,
}
return manager.gce.service.Disks.Insert(
manager.gce.projectID, zone, diskToCreateV1).Do()
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.Disks().Insert(ctx, meta.ZonalKey(name, zone), diskToCreateV1)
}
func (manager *gceServiceManager) CreateRegionalDiskOnCloudProvider(
@ -143,42 +141,44 @@ func (manager *gceServiceManager) CreateRegionalDiskOnCloudProvider(
sizeGb int64,
tagsStr string,
diskType string,
replicaZones sets.String) (gceObject, error) {
replicaZones sets.String) error {
if utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
diskTypeURI, err := manager.getDiskTypeURI(
manager.gce.region /* diskRegion */, multiZone{replicaZones}, diskType, true /* useBetaAPI */)
if err != nil {
return nil, err
}
fullyQualifiedReplicaZones := []string{}
for _, replicaZone := range replicaZones.UnsortedList() {
fullyQualifiedReplicaZones = append(
fullyQualifiedReplicaZones, manager.getReplicaZoneURI(replicaZone, true))
}
diskToCreateBeta := &computebeta.Disk{
Name: name,
SizeGb: sizeGb,
Description: tagsStr,
Type: diskTypeURI,
ReplicaZones: fullyQualifiedReplicaZones,
}
return manager.gce.serviceBeta.RegionDisks.Insert(
manager.gce.projectID, manager.gce.region, diskToCreateBeta).Do()
if !utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
return fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
}
return nil, fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
diskTypeURI, err := manager.getDiskTypeURI(
manager.gce.region /* diskRegion */, multiZone{replicaZones}, diskType, true /* useBetaAPI */)
if err != nil {
return err
}
fullyQualifiedReplicaZones := []string{}
for _, replicaZone := range replicaZones.UnsortedList() {
fullyQualifiedReplicaZones = append(
fullyQualifiedReplicaZones, manager.getReplicaZoneURI(replicaZone, true))
}
diskToCreateBeta := &computebeta.Disk{
Name: name,
SizeGb: sizeGb,
Description: tagsStr,
Type: diskTypeURI,
ReplicaZones: fullyQualifiedReplicaZones,
}
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.BetaRegionDisks().Insert(ctx, meta.RegionalKey(name, manager.gce.region), diskToCreateBeta)
}
func (manager *gceServiceManager) AttachDiskOnCloudProvider(
disk *GCEDisk,
readWrite string,
instanceZone string,
instanceName string) (gceObject, error) {
instanceName string) error {
source, err := manager.getDiskSourceURI(disk)
if err != nil {
return nil, err
return err
}
attachedDiskV1 := &compute.AttachedDisk{
@ -188,16 +188,19 @@ func (manager *gceServiceManager) AttachDiskOnCloudProvider(
Source: source,
Type: diskTypePersistent,
}
return manager.gce.service.Instances.AttachDisk(
manager.gce.projectID, instanceZone, instanceName, attachedDiskV1).Do()
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.Instances().AttachDisk(ctx, meta.ZonalKey(instanceName, instanceZone), attachedDiskV1)
}
func (manager *gceServiceManager) DetachDiskOnCloudProvider(
instanceZone string,
instanceName string,
devicePath string) (gceObject, error) {
return manager.gce.service.Instances.DetachDisk(
manager.gce.projectID, instanceZone, instanceName, devicePath).Do()
devicePath string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.Instances().DetachDisk(ctx, meta.ZonalKey(instanceName, instanceZone), devicePath)
}
func (manager *gceServiceManager) GetDiskFromCloudProvider(
@ -211,8 +214,9 @@ func (manager *gceServiceManager) GetDiskFromCloudProvider(
return nil, fmt.Errorf("Can not fetch disk. Zone is specified (%q). But disk name is empty.", zone)
}
diskStable, err := manager.gce.service.Disks.Get(
manager.gce.projectID, zone, diskName).Do()
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
diskStable, err := manager.gce.c.Disks().Get(ctx, meta.ZonalKey(diskName, zone))
if err != nil {
return nil, err
}
@ -240,56 +244,50 @@ func (manager *gceServiceManager) GetDiskFromCloudProvider(
func (manager *gceServiceManager) GetRegionalDiskFromCloudProvider(
diskName string) (*GCEDisk, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
diskBeta, err := manager.gce.serviceBeta.RegionDisks.Get(
manager.gce.projectID, manager.gce.region, diskName).Do()
if err != nil {
return nil, err
}
zones := sets.NewString()
for _, zoneURI := range diskBeta.ReplicaZones {
zones.Insert(lastComponent(zoneURI))
}
return &GCEDisk{
ZoneInfo: multiZone{zones},
Region: lastComponent(diskBeta.Region),
Name: diskBeta.Name,
Kind: diskBeta.Kind,
Type: diskBeta.Type,
SizeGb: diskBeta.SizeGb,
}, nil
if !utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
return nil, fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
}
return nil, fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
diskBeta, err := manager.gce.c.BetaRegionDisks().Get(ctx, meta.RegionalKey(diskName, manager.gce.region))
if err != nil {
return nil, err
}
zones := sets.NewString()
for _, zoneURI := range diskBeta.ReplicaZones {
zones.Insert(lastComponent(zoneURI))
}
return &GCEDisk{
ZoneInfo: multiZone{zones},
Region: lastComponent(diskBeta.Region),
Name: diskBeta.Name,
Kind: diskBeta.Kind,
Type: diskBeta.Type,
SizeGb: diskBeta.SizeGb,
}, nil
}
func (manager *gceServiceManager) DeleteDiskOnCloudProvider(
zone string,
diskName string) (gceObject, error) {
return manager.gce.service.Disks.Delete(
manager.gce.projectID, zone, diskName).Do()
diskName string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.Disks().Delete(ctx, meta.ZonalKey(diskName, zone))
}
func (manager *gceServiceManager) DeleteRegionalDiskOnCloudProvider(
diskName string) (gceObject, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
return manager.gce.serviceBeta.RegionDisks.Delete(
manager.gce.projectID, manager.gce.region, diskName).Do()
diskName string) error {
if !utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
return fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
}
return nil, fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
}
func (manager *gceServiceManager) WaitForZoneOp(
op gceObject, zone string, mc *metricContext) error {
return manager.gce.waitForZoneOp(op, zone, mc)
}
func (manager *gceServiceManager) WaitForRegionalOp(
op gceObject, mc *metricContext) error {
return manager.gce.waitForRegionOp(op, manager.gce.region, mc)
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.BetaRegionDisks().Delete(ctx, meta.RegionalKey(diskName, manager.gce.region))
}
func (manager *gceServiceManager) getDiskSourceURI(disk *GCEDisk) (string, error) {
@ -411,21 +409,28 @@ func (manager *gceServiceManager) getRegionFromZone(zoneInfo zoneType) (string,
return region, nil
}
func (manager *gceServiceManager) ResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64, zone string) (gceObject, error) {
func (manager *gceServiceManager) ResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64, zone string) error {
resizeServiceRequest := &compute.DisksResizeRequest{
SizeGb: sizeGb,
}
return manager.gce.service.Disks.Resize(manager.gce.projectID, zone, disk.Name, resizeServiceRequest).Do()
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.Disks().Resize(ctx, meta.ZonalKey(disk.Name, zone), resizeServiceRequest)
}
func (manager *gceServiceManager) RegionalResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64) (gceObject, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
resizeServiceRequest := &computebeta.RegionDisksResizeRequest{
SizeGb: sizeGb,
}
return manager.gce.serviceBeta.RegionDisks.Resize(manager.gce.projectID, disk.Region, disk.Name, resizeServiceRequest).Do()
func (manager *gceServiceManager) RegionalResizeDiskOnCloudProvider(disk *GCEDisk, sizeGb int64) error {
if !utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
return fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
}
return nil, fmt.Errorf("the regional PD feature is only available with the %s Kubernetes feature gate enabled", features.GCERegionalPersistentDisk)
resizeServiceRequest := &computebeta.RegionDisksResizeRequest{
SizeGb: sizeGb,
}
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return manager.gce.c.BetaRegionDisks().Resize(ctx, meta.RegionalKey(disk.Name, disk.Region), resizeServiceRequest)
}
// Disks is interface for manipulation with GCE PDs.
@ -555,14 +560,7 @@ func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOn
readWrite = "READ_ONLY"
}
attachOp, err := gce.manager.AttachDiskOnCloudProvider(
disk, readWrite, instance.Zone, instance.Name)
if err != nil {
return mc.Observe(err)
}
return gce.manager.WaitForZoneOp(attachOp, instance.Zone, mc)
return mc.Observe(gce.manager.AttachDiskOnCloudProvider(disk, readWrite, instance.Zone, instance.Name))
}
func (gce *GCECloud) DetachDisk(devicePath string, nodeName types.NodeName) error {
@ -582,12 +580,7 @@ func (gce *GCECloud) DetachDisk(devicePath string, nodeName types.NodeName) erro
}
mc := newDiskMetricContextZonal("detach", gce.region, inst.Zone)
detachOp, err := gce.manager.DetachDiskOnCloudProvider(inst.Zone, inst.Name, devicePath)
if err != nil {
return mc.Observe(err)
}
return gce.manager.WaitForZoneOp(detachOp, inst.Zone, mc)
return mc.Observe(gce.manager.DetachDiskOnCloudProvider(inst.Zone, inst.Name, devicePath))
}
func (gce *GCECloud) DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) {
@ -675,17 +668,10 @@ func (gce *GCECloud) CreateDisk(
mc := newDiskMetricContextZonal("create", gce.region, zone)
createOp, err := gce.manager.CreateDiskOnCloudProvider(
err = gce.manager.CreateDiskOnCloudProvider(
name, sizeGb, tagsStr, diskType, zone)
if isGCEError(err, "alreadyExists") {
glog.Warningf("GCE PD %q already exists, reusing", name)
return nil
} else if err != nil {
return mc.Observe(err)
}
err = gce.manager.WaitForZoneOp(createOp, zone, mc)
mc.Observe(err)
if isGCEError(err, "alreadyExists") {
glog.Warningf("GCE PD %q already exists, reusing", name)
return nil
@ -723,17 +709,10 @@ func (gce *GCECloud) CreateRegionalDisk(
mc := newDiskMetricContextRegional("create", gce.region)
createOp, err := gce.manager.CreateRegionalDiskOnCloudProvider(
err = gce.manager.CreateRegionalDiskOnCloudProvider(
name, sizeGb, tagsStr, diskType, replicaZones)
if isGCEError(err, "alreadyExists") {
glog.Warningf("GCE PD %q already exists, reusing", name)
return nil
} else if err != nil {
return mc.Observe(err)
}
err = gce.manager.WaitForRegionalOp(createOp, mc)
mc.Observe(err)
if isGCEError(err, "alreadyExists") {
glog.Warningf("GCE PD %q already exists, reusing", name)
return nil
@ -786,31 +765,26 @@ func (gce *GCECloud) ResizeDisk(diskToResize string, oldSize resource.Quantity,
switch zoneInfo := disk.ZoneInfo.(type) {
case singleZone:
mc = newDiskMetricContextZonal("resize", disk.Region, zoneInfo.zone)
resizeOp, err := gce.manager.ResizeDiskOnCloudProvider(disk, requestGB, zoneInfo.zone)
err := gce.manager.ResizeDiskOnCloudProvider(disk, requestGB, zoneInfo.zone)
if err != nil {
return oldSize, mc.Observe(err)
} else {
return newSizeQuant, mc.Observe(err)
}
waitErr := gce.manager.WaitForZoneOp(resizeOp, zoneInfo.zone, mc)
if waitErr != nil {
return oldSize, waitErr
}
return newSizeQuant, nil
case multiZone:
if utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
mc = newDiskMetricContextRegional("resize", disk.Region)
resizeOp, err := gce.manager.RegionalResizeDiskOnCloudProvider(disk, requestGB)
if err != nil {
return oldSize, mc.Observe(err)
}
waitErr := gce.manager.WaitForRegionalOp(resizeOp, mc)
if waitErr != nil {
return oldSize, waitErr
}
return newSizeQuant, nil
if !utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
return oldSize, fmt.Errorf("disk.ZoneInfo has unexpected type %T", zoneInfo)
}
mc = newDiskMetricContextRegional("resize", disk.Region)
err := gce.manager.RegionalResizeDiskOnCloudProvider(disk, requestGB)
if err != nil {
return oldSize, mc.Observe(err)
} else {
return newSizeQuant, mc.Observe(err)
}
return oldSize, fmt.Errorf("disk.ZoneInfo has unexpected type %T", zoneInfo)
case nil:
return oldSize, fmt.Errorf("PD has nil ZoneInfo: %v", disk)
default:
@ -1026,21 +1000,14 @@ func (gce *GCECloud) doDeleteDisk(diskToDelete string) error {
switch zoneInfo := disk.ZoneInfo.(type) {
case singleZone:
mc = newDiskMetricContextZonal("delete", disk.Region, zoneInfo.zone)
deleteOp, err := gce.manager.DeleteDiskOnCloudProvider(zoneInfo.zone, disk.Name)
if err != nil {
return mc.Observe(err)
}
return gce.manager.WaitForZoneOp(deleteOp, zoneInfo.zone, mc)
return mc.Observe(gce.manager.DeleteDiskOnCloudProvider(zoneInfo.zone, disk.Name))
case multiZone:
if utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
mc = newDiskMetricContextRegional("delete", disk.Region)
deleteOp, err := gce.manager.DeleteRegionalDiskOnCloudProvider(disk.Name)
if err != nil {
return mc.Observe(err)
}
return gce.manager.WaitForRegionalOp(deleteOp, mc)
if !utilfeature.DefaultFeatureGate.Enabled(features.GCERegionalPersistentDisk) {
return fmt.Errorf("disk.ZoneInfo has unexpected type %T", zoneInfo)
}
return fmt.Errorf("disk.ZoneInfo has unexpected type %T", zoneInfo)
mc = newDiskMetricContextRegional("delete", disk.Region)
return mc.Observe(gce.manager.DeleteRegionalDiskOnCloudProvider(disk.Name))
case nil:
return fmt.Errorf("PD has nil ZoneInfo: %v", disk)
default:

View File

@ -70,9 +70,6 @@ func TestCreateDisk_Basic(t *testing.T) {
if !fakeManager.createDiskCalled {
t.Error("Never called GCE disk create.")
}
if !fakeManager.doesOpMatch {
t.Error("Ops used in WaitForZoneOp does not match what's returned by CreateDisk.")
}
// Partial check of equality between disk description sent to GCE and parameters of method.
diskToCreate := fakeManager.diskToCreateStable
@ -127,9 +124,6 @@ func TestCreateRegionalDisk_Basic(t *testing.T) {
if !fakeManager.createDiskCalled {
t.Error("Never called GCE disk create.")
}
if !fakeManager.doesOpMatch {
t.Error("Ops used in WaitForZoneOp does not match what's returned by CreateDisk.")
}
// Partial check of equality between disk description sent to GCE and parameters of method.
diskToCreate := fakeManager.diskToCreateStable
@ -165,7 +159,7 @@ func TestCreateDisk_DiskAlreadyExists(t *testing.T) {
// Inject disk AlreadyExists error.
alreadyExistsError := googleapi.ErrorItem{Reason: "alreadyExists"}
fakeManager.waitForOpError = &googleapi.Error{
fakeManager.opError = &googleapi.Error{
Errors: []googleapi.ErrorItem{alreadyExistsError},
}
@ -314,9 +308,6 @@ func TestDeleteDisk_Basic(t *testing.T) {
if !fakeManager.deleteDiskCalled {
t.Error("Never called GCE disk delete.")
}
if !fakeManager.doesOpMatch {
t.Error("Ops used in WaitForZoneOp does not match what's returned by DeleteDisk.")
}
}
@ -644,16 +635,12 @@ const (
type FakeServiceManager struct {
// Common fields shared among tests
targetAPI targetClientAPI
gceProjectID string
gceRegion string
opAlpha *computealpha.Operation // Mocks an operation returned by GCE API calls
opBeta *computebeta.Operation // Mocks an operation returned by GCE API calls
opStable *compute.Operation // Mocks an operation returned by GCE API calls
doesOpMatch bool
zonalDisks map[string]string // zone: diskName
regionalDisks map[string]sets.String // diskName: zones
waitForOpError error // Error to be returned by WaitForZoneOp or WaitForRegionalOp
targetAPI targetClientAPI
gceProjectID string
gceRegion string
zonalDisks map[string]string // zone: diskName
regionalDisks map[string]sets.String // diskName: zones
opError error
// Fields for TestCreateDisk
createDiskCalled bool
@ -684,12 +671,11 @@ func (manager *FakeServiceManager) CreateDiskOnCloudProvider(
sizeGb int64,
tagsStr string,
diskType string,
zone string) (gceObject, error) {
zone string) error {
manager.createDiskCalled = true
switch t := manager.targetAPI; t {
case targetStable:
manager.opStable = &compute.Operation{}
diskTypeURI := gceComputeAPIEndpoint + "projects/" + fmt.Sprintf(diskTypeURITemplateSingleZone, manager.gceProjectID, zone, diskType)
diskToCreateV1 := &compute.Disk{
Name: name,
@ -699,9 +685,8 @@ func (manager *FakeServiceManager) CreateDiskOnCloudProvider(
}
manager.diskToCreateStable = diskToCreateV1
manager.zonalDisks[zone] = diskToCreateV1.Name
return manager.opStable, nil
return nil
case targetBeta:
manager.opBeta = &computebeta.Operation{}
diskTypeURI := gceComputeAPIEndpoint + "projects/" + fmt.Sprintf(diskTypeURITemplateSingleZone, manager.gceProjectID, zone, diskType)
diskToCreateBeta := &computebeta.Disk{
Name: name,
@ -711,9 +696,8 @@ func (manager *FakeServiceManager) CreateDiskOnCloudProvider(
}
manager.diskToCreateBeta = diskToCreateBeta
manager.zonalDisks[zone] = diskToCreateBeta.Name
return manager.opBeta, nil
return nil
case targetAlpha:
manager.opAlpha = &computealpha.Operation{}
diskTypeURI := gceComputeAPIEndpointBeta + "projects/" + fmt.Sprintf(diskTypeURITemplateSingleZone, manager.gceProjectID, zone, diskType)
diskToCreateAlpha := &computealpha.Disk{
Name: name,
@ -723,9 +707,9 @@ func (manager *FakeServiceManager) CreateDiskOnCloudProvider(
}
manager.diskToCreateAlpha = diskToCreateAlpha
manager.zonalDisks[zone] = diskToCreateAlpha.Name
return manager.opAlpha, nil
return nil
default:
return nil, fmt.Errorf("unexpected type: %T", t)
return fmt.Errorf("unexpected type: %T", t)
}
}
@ -738,13 +722,12 @@ func (manager *FakeServiceManager) CreateRegionalDiskOnCloudProvider(
sizeGb int64,
tagsStr string,
diskType string,
zones sets.String) (gceObject, error) {
zones sets.String) error {
manager.createDiskCalled = true
diskTypeURI := gceComputeAPIEndpointBeta + "projects/" + fmt.Sprintf(diskTypeURITemplateRegional, manager.gceProjectID, manager.gceRegion, diskType)
switch t := manager.targetAPI; t {
case targetStable:
manager.opStable = &compute.Operation{}
diskToCreateV1 := &compute.Disk{
Name: name,
SizeGb: sizeGb,
@ -753,13 +736,13 @@ func (manager *FakeServiceManager) CreateRegionalDiskOnCloudProvider(
}
manager.diskToCreateStable = diskToCreateV1
manager.regionalDisks[diskToCreateV1.Name] = zones
return manager.opStable, nil
return nil
case targetBeta:
return nil, fmt.Errorf("RegionalDisk CreateDisk op not supported in beta.")
return fmt.Errorf("RegionalDisk CreateDisk op not supported in beta.")
case targetAlpha:
return nil, fmt.Errorf("RegionalDisk CreateDisk op not supported in alpha.")
return fmt.Errorf("RegionalDisk CreateDisk op not supported in alpha.")
default:
return nil, fmt.Errorf("unexpected type: %T", t)
return fmt.Errorf("unexpected type: %T", t)
}
}
@ -767,39 +750,33 @@ func (manager *FakeServiceManager) AttachDiskOnCloudProvider(
disk *GCEDisk,
readWrite string,
instanceZone string,
instanceName string) (gceObject, error) {
instanceName string) error {
switch t := manager.targetAPI; t {
case targetStable:
manager.opStable = &compute.Operation{}
return manager.opStable, nil
return nil
case targetBeta:
manager.opBeta = &computebeta.Operation{}
return manager.opBeta, nil
return nil
case targetAlpha:
manager.opAlpha = &computealpha.Operation{}
return manager.opAlpha, nil
return nil
default:
return nil, fmt.Errorf("unexpected type: %T", t)
return fmt.Errorf("unexpected type: %T", t)
}
}
func (manager *FakeServiceManager) DetachDiskOnCloudProvider(
instanceZone string,
instanceName string,
devicePath string) (gceObject, error) {
devicePath string) error {
switch t := manager.targetAPI; t {
case targetStable:
manager.opStable = &compute.Operation{}
return manager.opStable, nil
return nil
case targetBeta:
manager.opBeta = &computebeta.Operation{}
return manager.opBeta, nil
return nil
case targetAlpha:
manager.opAlpha = &computealpha.Operation{}
return manager.opAlpha, nil
return nil
default:
return nil, fmt.Errorf("unexpected type: %T", t)
return fmt.Errorf("unexpected type: %T", t)
}
}
@ -856,13 +833,13 @@ func (manager *FakeServiceManager) GetRegionalDiskFromCloudProvider(
func (manager *FakeServiceManager) ResizeDiskOnCloudProvider(
disk *GCEDisk,
size int64,
zone string) (gceObject, error) {
zone string) error {
panic("Not implmented")
}
func (manager *FakeServiceManager) RegionalResizeDiskOnCloudProvider(
disk *GCEDisk,
size int64) (gceObject, error) {
size int64) error {
panic("Not implemented")
}
@ -871,91 +848,41 @@ func (manager *FakeServiceManager) RegionalResizeDiskOnCloudProvider(
*/
func (manager *FakeServiceManager) DeleteDiskOnCloudProvider(
zone string,
disk string) (gceObject, error) {
disk string) error {
manager.deleteDiskCalled = true
delete(manager.zonalDisks, zone)
switch t := manager.targetAPI; t {
case targetStable:
manager.opStable = &compute.Operation{}
return manager.opStable, nil
return nil
case targetBeta:
manager.opBeta = &computebeta.Operation{}
return manager.opBeta, nil
return nil
case targetAlpha:
manager.opAlpha = &computealpha.Operation{}
return manager.opAlpha, nil
return nil
default:
return nil, fmt.Errorf("unexpected type: %T", t)
return fmt.Errorf("unexpected type: %T", t)
}
}
func (manager *FakeServiceManager) DeleteRegionalDiskOnCloudProvider(
disk string) (gceObject, error) {
disk string) error {
manager.deleteDiskCalled = true
delete(manager.regionalDisks, disk)
switch t := manager.targetAPI; t {
case targetStable:
manager.opStable = &compute.Operation{}
return manager.opStable, nil
return nil
case targetBeta:
manager.opBeta = &computebeta.Operation{}
return manager.opBeta, nil
return nil
case targetAlpha:
manager.opAlpha = &computealpha.Operation{}
return manager.opAlpha, nil
return nil
default:
return nil, fmt.Errorf("unexpected type: %T", t)
return fmt.Errorf("unexpected type: %T", t)
}
}
func (manager *FakeServiceManager) WaitForZoneOp(
op gceObject,
zone string,
mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
if op.(*computealpha.Operation) == manager.opAlpha {
manager.doesOpMatch = true
}
case *computebeta.Operation:
if op.(*computebeta.Operation) == manager.opBeta {
manager.doesOpMatch = true
}
case *compute.Operation:
if op.(*compute.Operation) == manager.opStable {
manager.doesOpMatch = true
}
default:
return fmt.Errorf("unexpected type: %T", v)
}
return manager.waitForOpError
}
func (manager *FakeServiceManager) WaitForRegionalOp(
op gceObject, mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
if op.(*computealpha.Operation) == manager.opAlpha {
manager.doesOpMatch = true
}
case *computebeta.Operation:
if op.(*computebeta.Operation) == manager.opBeta {
manager.doesOpMatch = true
}
case *compute.Operation:
if op.(*compute.Operation) == manager.opStable {
manager.doesOpMatch = true
}
default:
return fmt.Errorf("unexpected type: %T", v)
}
return manager.waitForOpError
}
func createNodeZones(zones []string) map[string]sets.String {
nodeZones := map[string]sets.String{}
for _, zone := range zones {

View File

@ -1,180 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"encoding/json"
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/golang/glog"
computealpha "google.golang.org/api/compute/v0.alpha"
computebeta "google.golang.org/api/compute/v0.beta"
computev1 "google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
)
func (gce *GCECloud) waitForOp(op *computev1.Operation, getOperation func(operationName string) (*computev1.Operation, error), mc *metricContext) error {
if op == nil {
return mc.Observe(fmt.Errorf("operation must not be nil"))
}
if opIsDone(op) {
return getErrorFromOp(op)
}
opStart := time.Now()
opName := op.Name
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
start := time.Now()
gce.operationPollRateLimiter.Accept()
duration := time.Since(start)
if duration > 5*time.Second {
glog.V(2).Infof("pollOperation: throttled %v for %v", duration, opName)
}
pollOp, err := getOperation(opName)
if err != nil {
glog.Warningf("GCE poll operation %s failed: pollOp: [%v] err: [%v] getErrorFromOp: [%v]",
opName, pollOp, err, getErrorFromOp(pollOp))
}
done := opIsDone(pollOp)
if done {
duration := time.Since(opStart)
if duration > 1*time.Minute {
// Log the JSON. It's cleaner than the %v structure.
enc, err := pollOp.MarshalJSON()
if err != nil {
glog.Warningf("waitForOperation: long operation (%v): %v (failed to encode to JSON: %v)",
duration, pollOp, err)
} else {
glog.V(2).Infof("waitForOperation: long operation (%v): %v",
duration, string(enc))
}
}
}
return done, mc.Observe(getErrorFromOp(pollOp))
})
}
func opIsDone(op *computev1.Operation) bool {
return op != nil && op.Status == "DONE"
}
func getErrorFromOp(op *computev1.Operation) error {
if op != nil && op.Error != nil && len(op.Error.Errors) > 0 {
err := &googleapi.Error{
Code: int(op.HttpErrorStatusCode),
Message: op.Error.Errors[0].Message,
}
glog.Errorf("GCE operation failed: %v", err)
return err
}
return nil
}
func (gce *GCECloud) waitForGlobalOp(op gceObject, mc *metricContext) error {
return gce.waitForGlobalOpInProject(op, gce.ProjectID(), mc)
}
func (gce *GCECloud) waitForRegionOp(op gceObject, region string, mc *metricContext) error {
return gce.waitForRegionOpInProject(op, gce.ProjectID(), region, mc)
}
func (gce *GCECloud) waitForZoneOp(op gceObject, zone string, mc *metricContext) error {
return gce.waitForZoneOpInProject(op, gce.ProjectID(), zone, mc)
}
func (gce *GCECloud) waitForGlobalOpInProject(op gceObject, projectID string, mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceAlpha.GlobalOperations.Get(projectID, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computebeta.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceBeta.GlobalOperations.Get(projectID, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computev1.Operation:
return gce.waitForOp(op.(*computev1.Operation), func(operationName string) (*computev1.Operation, error) {
return gce.service.GlobalOperations.Get(projectID, operationName).Do()
}, mc)
default:
return fmt.Errorf("unexpected type: %T", v)
}
}
func (gce *GCECloud) waitForRegionOpInProject(op gceObject, projectID, region string, mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceAlpha.RegionOperations.Get(projectID, region, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computebeta.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceBeta.RegionOperations.Get(projectID, region, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computev1.Operation:
return gce.waitForOp(op.(*computev1.Operation), func(operationName string) (*computev1.Operation, error) {
return gce.service.RegionOperations.Get(projectID, region, operationName).Do()
}, mc)
default:
return fmt.Errorf("unexpected type: %T", v)
}
}
func (gce *GCECloud) waitForZoneOpInProject(op gceObject, projectID, zone string, mc *metricContext) error {
switch v := op.(type) {
case *computealpha.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceAlpha.ZoneOperations.Get(projectID, zone, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computebeta.Operation:
return gce.waitForOp(convertToV1Operation(op), func(operationName string) (*computev1.Operation, error) {
op, err := gce.serviceBeta.ZoneOperations.Get(projectID, zone, operationName).Do()
return convertToV1Operation(op), err
}, mc)
case *computev1.Operation:
return gce.waitForOp(op.(*computev1.Operation), func(operationName string) (*computev1.Operation, error) {
return gce.service.ZoneOperations.Get(projectID, zone, operationName).Do()
}, mc)
default:
return fmt.Errorf("unexpected type: %T", v)
}
}
func convertToV1Operation(object gceObject) *computev1.Operation {
enc, err := object.MarshalJSON()
if err != nil {
panic(fmt.Sprintf("Failed to encode to json: %v", err))
}
var op computev1.Operation
if err := json.Unmarshal(enc, &op); err != nil {
panic(fmt.Sprintf("Failed to convert GCE apiObject %v to v1 operation: %v", object, err))
}
return &op
}

View File

@ -18,16 +18,12 @@ package gce
import (
"context"
"encoding/json"
"reflect"
"strings"
"testing"
"golang.org/x/oauth2/google"
computealpha "google.golang.org/api/compute/v0.alpha"
computebeta "google.golang.org/api/compute/v0.beta"
computev1 "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
)
@ -480,56 +476,6 @@ func TestGenerateCloudConfigs(t *testing.T) {
}
}
func TestConvertToV1Operation(t *testing.T) {
v1Op := getTestOperation()
enc, _ := v1Op.MarshalJSON()
var op interface{}
var alphaOp computealpha.Operation
var betaOp computebeta.Operation
if err := json.Unmarshal(enc, &alphaOp); err != nil {
t.Errorf("Failed to unmarshal operation: %v", err)
}
if err := json.Unmarshal(enc, &betaOp); err != nil {
t.Errorf("Failed to unmarshal operation: %v", err)
}
op = convertToV1Operation(&alphaOp)
if _, ok := op.(*computev1.Operation); ok {
if !reflect.DeepEqual(op, v1Op) {
t.Errorf("Failed to maintain consistency across conversion")
}
} else {
t.Errorf("Expect output to be type v1 operation, but got %v", op)
}
op = convertToV1Operation(&betaOp)
if _, ok := op.(*computev1.Operation); ok {
if !reflect.DeepEqual(op, v1Op) {
t.Errorf("Failed to maintain consistency across conversion")
}
} else {
t.Errorf("Expect output to be type v1 operation, but got %v", op)
}
}
func getTestOperation() *computev1.Operation {
return &computev1.Operation{
Name: "test",
Description: "test",
Id: uint64(12345),
Error: &computev1.OperationError{
Errors: []*computev1.OperationErrorErrors{
{
Code: "555",
Message: "error",
},
},
},
}
}
func TestNewAlphaFeatureGate(t *testing.T) {
testCases := []struct {
alphaFeatures []string