mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #78140 from zhan849/aws-get-instance-by-id
Cloud provider AWS library should query instance by ID when possible
This commit is contained in:
commit
416a717eff
@ -33,10 +33,13 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/pkg/version:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",
|
||||
@ -82,6 +85,8 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
|
||||
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
|
||||
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
|
||||
|
@ -53,10 +53,13 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
informercorev1 "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/pkg/version"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/cloud-provider"
|
||||
nodehelpers "k8s.io/cloud-provider/node/helpers"
|
||||
@ -509,8 +512,13 @@ type Cloud struct {
|
||||
|
||||
instanceCache instanceCache
|
||||
|
||||
clientBuilder cloudprovider.ControllerClientBuilder
|
||||
kubeClient clientset.Interface
|
||||
clientBuilder cloudprovider.ControllerClientBuilder
|
||||
kubeClient clientset.Interface
|
||||
|
||||
nodeInformer informercorev1.NodeInformer
|
||||
// Extract the function out to make it easier to test
|
||||
nodeInformerHasSynced cache.InformerSynced
|
||||
|
||||
eventBroadcaster record.EventBroadcaster
|
||||
eventRecorder record.EventRecorder
|
||||
|
||||
@ -748,6 +756,14 @@ func (p *awsSDKProvider) getCrossRequestRetryDelay(regionName string) *CrossRequ
|
||||
return delayer
|
||||
}
|
||||
|
||||
// SetInformers implements InformerUser interface by setting up informer-fed caches for aws lib to
|
||||
// leverage Kubernetes API for caching
|
||||
func (c *Cloud) SetInformers(informerFactory informers.SharedInformerFactory) {
|
||||
klog.Infof("Setting up informers for Cloud")
|
||||
c.nodeInformer = informerFactory.Core().V1().Nodes()
|
||||
c.nodeInformerHasSynced = c.nodeInformer.Informer().HasSynced
|
||||
}
|
||||
|
||||
func (p *awsSDKProvider) Compute(regionName string) (EC2, error) {
|
||||
awsConfig := &aws.Config{
|
||||
Region: ®ionName,
|
||||
@ -1289,7 +1305,6 @@ func newAWSCloud(cfg CloudConfig, awsServices Services) (*Cloud, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return awsCloud, nil
|
||||
}
|
||||
|
||||
@ -4520,7 +4535,18 @@ func (c *Cloud) findInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance,
|
||||
// Returns the instance with the specified node name
|
||||
// Like findInstanceByNodeName, but returns error if node not found
|
||||
func (c *Cloud) getInstanceByNodeName(nodeName types.NodeName) (*ec2.Instance, error) {
|
||||
instance, err := c.findInstanceByNodeName(nodeName)
|
||||
var instance *ec2.Instance
|
||||
|
||||
// we leverage node cache to try to retrieve node's provider id first, as
|
||||
// get instance by provider id is way more efficient than by filters in
|
||||
// aws context
|
||||
awsID, err := c.nodeNameToProviderID(nodeName)
|
||||
if err != nil {
|
||||
klog.V(3).Infof("Unable to convert node name %q to aws instanceID, fall back to findInstanceByNodeName: %v", nodeName, err)
|
||||
instance, err = c.findInstanceByNodeName(nodeName)
|
||||
} else {
|
||||
instance, err = c.getInstanceByID(string(awsID))
|
||||
}
|
||||
if err == nil && instance == nil {
|
||||
return nil, cloudprovider.InstanceNotFound
|
||||
}
|
||||
@ -4540,6 +4566,26 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins
|
||||
return awsInstance, instance, err
|
||||
}
|
||||
|
||||
func (c *Cloud) nodeNameToProviderID(nodeName types.NodeName) (InstanceID, error) {
|
||||
if len(nodeName) == 0 {
|
||||
return "", fmt.Errorf("no nodeName provided")
|
||||
}
|
||||
|
||||
if c.nodeInformerHasSynced == nil || !c.nodeInformerHasSynced() {
|
||||
return "", fmt.Errorf("node informer has not synced yet")
|
||||
}
|
||||
|
||||
node, err := c.nodeInformer.Lister().Get(string(nodeName))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(node.Spec.ProviderID) == 0 {
|
||||
return "", fmt.Errorf("node has no providerID")
|
||||
}
|
||||
|
||||
return KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
|
||||
}
|
||||
|
||||
func setNodeDisk(
|
||||
nodeDiskMap map[types.NodeName]map[KubernetesVolumeID]bool,
|
||||
volumeID KubernetesVolumeID,
|
||||
|
@ -36,6 +36,8 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
cloudvolume "k8s.io/cloud-provider/volume"
|
||||
)
|
||||
|
||||
@ -552,6 +554,7 @@ func mockInstancesResp(selfInstance *ec2.Instance, instances []*ec2.Instance) (*
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
awsCloud.kubeClient = fake.NewSimpleClientset()
|
||||
return awsCloud, awsServices
|
||||
}
|
||||
|
||||
@ -561,6 +564,7 @@ func mockAvailabilityZone(availabilityZone string) *Cloud {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
awsCloud.kubeClient = fake.NewSimpleClientset()
|
||||
return awsCloud
|
||||
}
|
||||
|
||||
@ -1910,6 +1914,53 @@ func TestRegionIsValid(t *testing.T) {
|
||||
assert.False(t, isRegionValid("pl-fake-991a", fake.metadata), "expected region 'pl-fake-991' to be invalid but it was not")
|
||||
}
|
||||
|
||||
func TestNodeNameToProviderID(t *testing.T) {
|
||||
testNodeName := types.NodeName("ip-10-0-0-1.ec2.internal")
|
||||
testProviderID := "aws:///us-east-1c/i-02bce90670bb0c7cd"
|
||||
fakeAWS := newMockedFakeAWSServices(TestClusterID)
|
||||
c, err := newAWSCloud(CloudConfig{}, fakeAWS)
|
||||
assert.NoError(t, err)
|
||||
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
|
||||
c.SetInformers(fakeInformerFactory)
|
||||
|
||||
// no node name
|
||||
_, err = c.nodeNameToProviderID("")
|
||||
assert.Error(t, err)
|
||||
|
||||
// informer has not synced
|
||||
c.nodeInformerHasSynced = informerNotSynced
|
||||
_, err = c.nodeNameToProviderID(testNodeName)
|
||||
assert.Error(t, err)
|
||||
|
||||
// informer has synced but node not found
|
||||
c.nodeInformerHasSynced = informerSynced
|
||||
_, err = c.nodeNameToProviderID(testNodeName)
|
||||
assert.Error(t, err)
|
||||
|
||||
// we are able to find the node in cache
|
||||
err = c.nodeInformer.Informer().GetStore().Add(&v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: string(testNodeName),
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
ProviderID: testProviderID,
|
||||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
_, err = c.nodeNameToProviderID(testNodeName)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func informerSynced() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func informerNotSynced() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func newMockedFakeAWSServices(id string) *FakeAWSServices {
|
||||
s := NewFakeAWSServices(id)
|
||||
s.ec2 = &MockedFakeEC2{FakeEC2Impl: s.ec2.(*FakeEC2Impl)}
|
||||
|
Loading…
Reference in New Issue
Block a user