From 6ae76205d7a63a64a0178fb0edded9967d11a66e Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Mon, 20 May 2019 16:20:33 -0700 Subject: [PATCH] Cloud provider AWS library should query instance by ID when possible --- .../k8s.io/legacy-cloud-providers/aws/BUILD | 5 ++ .../k8s.io/legacy-cloud-providers/aws/aws.go | 54 +++++++++++++++++-- .../legacy-cloud-providers/aws/aws_test.go | 51 ++++++++++++++++++ 3 files changed, 106 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/BUILD b/staging/src/k8s.io/legacy-cloud-providers/aws/BUILD index 3fa4d5b4a49..f12856f1f7d 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/BUILD +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/BUILD @@ -33,10 +33,13 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/pkg/version:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library", @@ -82,6 +85,8 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/cloud-provider/volume:go_default_library", "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library", diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go index 43d3ea3d04e..a2e049a1d04 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws.go @@ -53,10 +53,13 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + informercorev1 "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/pkg/version" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/cloud-provider" nodehelpers "k8s.io/cloud-provider/node/helpers" @@ -509,8 +512,13 @@ type Cloud struct { instanceCache instanceCache - clientBuilder cloudprovider.ControllerClientBuilder - kubeClient clientset.Interface + clientBuilder cloudprovider.ControllerClientBuilder + kubeClient clientset.Interface + + nodeInformer informercorev1.NodeInformer + // Extract the function out to make it easier to test + nodeInformerHasSynced cache.InformerSynced + eventBroadcaster record.EventBroadcaster eventRecorder record.EventRecorder @@ -748,6 +756,14 @@ func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequ return delayer } +// SetInformers implements InformerUser interface by setting up informer-fed caches for aws lib to +// leverage Kubernetes API for caching +func (c *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) { + klog.Infof("Setting up informers for Cloud") + c.nodeInformer = informerFactory.Core().V1().Nodes() + c.nodeInformerHasSynced = c.nodeInformer.Informer().HasSynced +} + func (p *awsSDKProvider) Compute(regionName string) (EC2, error) { awsConfig := &aws.Config{ Region: ®ionName, @@ -1289,7 +1305,6 @@ func newAWSCloud(cfg CloudConfig, awsServices Services) (*Cloud, error) { return nil, err } } - return awsCloud, nil } @@ -4492,7 +4507,18 @@ func (c *Cloud) findInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance, // Returns the instance with the specified node name // Like findInstanceByNodeName, but returns error if node not found func (c *Cloud) getInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance, error) { - instance, err := c.findInstanceByNodeName(nodeName) + var instance *ec2.Instance + + // we leverage node cache to try to retrieve node's provider id first, as + // get instance by provider id is way more efficient than by filters in + // aws context + awsID, err := c.nodeNameToProviderID(nodeName) + if err != nil { + klog.V(3).Infof("Unable to convert node name %q to aws instanceID, fall back to findInstanceByNodeName: %v", nodeName, err) + instance, err = c.findInstanceByNodeName(nodeName) + } else { + instance, err = c.getInstanceByID(string(awsID)) + } if err == nil && instance == nil { return nil, cloudprovider.InstanceNotFound } @@ -4512,6 +4538,26 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins return awsInstance, instance, err } +func (c *Cloud) nodeNameToProviderID(nodeName types.NodeName) (InstanceID, error) { + if len(nodeName) == 0 { + return "", fmt.Errorf("no nodeName provided") + } + + if c.nodeInformerHasSynced == nil || !c.nodeInformerHasSynced() { + return "", fmt.Errorf("node informer has not synced yet") + } + + node, err := c.nodeInformer.Lister().Get(string(nodeName)) + if err != nil { + return "", err + } + if len(node.Spec.ProviderID) == 0 { + return "", fmt.Errorf("node has no providerID") + } + + return KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() +} + func setNodeDisk( nodeDiskMap map[types.NodeName]map[KubernetesVolumeID]bool, volumeID KubernetesVolumeID, diff --git a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go index ac1d311c52a..a3d82f8908a 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/aws/aws_test.go @@ -36,6 +36,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" cloudvolume "k8s.io/cloud-provider/volume" ) @@ -552,6 +554,7 @@ func mockInstancesResp(selfInstance *ec2.Instance, instances []*ec2.Instance) (* if err != nil { panic(err) } + awsCloud.kubeClient = fake.NewSimpleClientset() return awsCloud, awsServices } @@ -561,6 +564,7 @@ func mockAvailabilityZone(availabilityZone string) *Cloud { if err != nil { panic(err) } + awsCloud.kubeClient = fake.NewSimpleClientset() return awsCloud } @@ -1875,6 +1879,53 @@ func TestRegionIsValid(t *testing.T) { assert.False(t, isRegionValid("pl-fake-991a", fake.metadata), "expected region 'pl-fake-991' to be invalid but it was not") } +func TestNodeNameToProviderID(t *testing.T) { + testNodeName := types.NodeName("ip-10-0-0-1.ec2.internal") + testProviderID := "aws:///us-east-1c/i-02bce90670bb0c7cd" + fakeAWS := newMockedFakeAWSServices(TestClusterID) + c, err := newAWSCloud(CloudConfig{}, fakeAWS) + assert.NoError(t, err) + + fakeClient := &fake.Clientset{} + fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + c.SetInformers(fakeInformerFactory) + + // no node name + _, err = c.nodeNameToProviderID("") + assert.Error(t, err) + + // informer has not synced + c.nodeInformerHasSynced = informerNotSynced + _, err = c.nodeNameToProviderID(testNodeName) + assert.Error(t, err) + + // informer has synced but node not found + c.nodeInformerHasSynced = informerSynced + _, err = c.nodeNameToProviderID(testNodeName) + assert.Error(t, err) + + // we are able to find the node in cache + err = c.nodeInformer.Informer().GetStore().Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: string(testNodeName), + }, + Spec: v1.NodeSpec{ + ProviderID: testProviderID, + }, + }) + assert.NoError(t, err) + _, err = c.nodeNameToProviderID(testNodeName) + assert.NoError(t, err) +} + +func informerSynced() bool { + return true +} + +func informerNotSynced() bool { + return false +} + func newMockedFakeAWSServices(id string) *FakeAWSServices { s := NewFakeAWSServices(id) s.ec2 = &MockedFakeEC2{FakeEC2Impl: s.ec2.(*FakeEC2Impl)}