Merge pull request #117243 from jsafrane/vsphere-node-informer

Add node check to vSphere cloud provider
This commit is contained in:
Kubernetes Prow Robot 2023-05-04 14:37:13 -07:00 committed by GitHub
commit 8aa49a7955
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 205 additions and 5 deletions

View File

@ -27,11 +27,14 @@ import (
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25/mo"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
coreclients "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
"k8s.io/legacy-cloud-providers/vsphere/vclib"
)
@ -61,6 +64,9 @@ type NodeManager struct {
//CredentialsManager
credentialManager *SecretCredentialManager
nodeLister corelisters.NodeLister
nodeGetter coreclients.NodesGetter
// Mutexes
registeredNodesLock sync.RWMutex
nodeInfoLock sync.RWMutex
@ -271,10 +277,43 @@ func (nm *NodeManager) GetNode(nodeName k8stypes.NodeName) (v1.Node, error) {
nm.registeredNodesLock.RLock()
node := nm.registeredNodes[convertToString(nodeName)]
nm.registeredNodesLock.RUnlock()
if node == nil {
return v1.Node{}, vclib.ErrNoVMFound
if node != nil {
klog.V(4).Infof("Node %s found in vSphere cloud provider cache", nodeName)
return *node, nil
}
return *node, nil
if nm.nodeLister != nil {
klog.V(4).Infof("Node %s missing in vSphere cloud provider cache, trying node informer")
node, err := nm.nodeLister.Get(convertToString(nodeName))
if err != nil {
if !errors.IsNotFound(err) {
return v1.Node{}, err
}
// Fall through with IsNotFound error and try to get the node from the API server
} else {
node := node.DeepCopy()
nm.addNode(node)
klog.V(4).Infof("Node %s found in vSphere cloud provider node informer", nodeName)
return *node, nil
}
}
if nm.nodeGetter != nil {
klog.V(4).Infof("Node %s missing in vSphere cloud provider caches, trying the API server")
node, err := nm.nodeGetter.Nodes().Get(context.TODO(), convertToString(nodeName), metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return v1.Node{}, err
}
// Fall through with IsNotFound error to keep the code consistent with the above
} else {
nm.addNode(node)
klog.V(4).Infof("Node %s found in the API server", nodeName)
return *node, nil
}
}
klog.V(4).Infof("Node %s not found in vSphere cloud provider", nodeName)
return v1.Node{}, vclib.ErrNoVMFound
}
func (nm *NodeManager) getNodes() map[string]*v1.Node {
@ -515,3 +554,11 @@ func (nm *NodeManager) GetHostsInZone(ctx context.Context, zoneFailureDomain str
klog.V(4).Infof("GetHostsInZone %v returning: %v", zoneFailureDomain, hosts)
return hosts, nil
}
func (nm *NodeManager) SetNodeLister(nodeLister corelisters.NodeLister) {
nm.nodeLister = nodeLister
}
func (nm *NodeManager) SetNodeGetter(nodeGetter coreclients.NodesGetter) {
nm.nodeGetter = nodeGetter
}

View File

@ -0,0 +1,149 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vsphere
import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/legacy-cloud-providers/vsphere/vclib"
)
// Annotation used to distinguish nodes in node cache / informer / API server
const nodeAnnotation = "test"
func getNode(annotation string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{
nodeAnnotation: annotation,
},
},
}
}
func TestGetNode(t *testing.T) {
tests := []struct {
name string
cachedNodes []*v1.Node
informerNodes []*v1.Node // "nil" means that the NodeManager has no nodeLister
apiServerNodes []*v1.Node // "nil" means that the NodeManager has no nodeGetter
expectedNodeAnnotation string
expectNotFound bool
}{
{
name: "No cached node anywhere",
cachedNodes: []*v1.Node{},
informerNodes: []*v1.Node{},
apiServerNodes: []*v1.Node{},
expectNotFound: true,
},
{
name: "No lister & getter",
cachedNodes: []*v1.Node{},
informerNodes: nil,
apiServerNodes: nil,
expectNotFound: true,
},
{
name: "cache is used first",
cachedNodes: []*v1.Node{getNode("cache")},
informerNodes: []*v1.Node{getNode("informer")},
apiServerNodes: []*v1.Node{getNode("apiserver")},
expectedNodeAnnotation: "cache",
},
{
name: "informer is used second",
cachedNodes: []*v1.Node{},
informerNodes: []*v1.Node{getNode("informer")},
apiServerNodes: []*v1.Node{getNode("apiserver")},
expectedNodeAnnotation: "informer",
},
{
name: "API server is used last",
cachedNodes: []*v1.Node{},
informerNodes: []*v1.Node{},
apiServerNodes: []*v1.Node{getNode("apiserver")},
expectedNodeAnnotation: "apiserver",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// local NodeManager cache
cache := make(map[string]*v1.Node)
for _, node := range test.cachedNodes {
cache[node.Name] = node
}
// Client with apiServerNodes
objs := []runtime.Object{}
for _, node := range test.apiServerNodes {
objs = append(objs, node)
}
client := fake.NewSimpleClientset(objs...)
nodeGetter := client.CoreV1()
// Informer + nodeLister. Despite the client already has apiServerNodes, they won't appear in the
// nodeLister, because the informer is never started.
factory := informers.NewSharedInformerFactory(client, 0 /* no resync */)
nodeInformer := factory.Core().V1().Nodes()
for _, node := range test.informerNodes {
nodeInformer.Informer().GetStore().Add(node)
}
nodeLister := nodeInformer.Lister()
nodeManager := NodeManager{
registeredNodes: cache,
}
if test.informerNodes != nil {
nodeManager.SetNodeLister(nodeLister)
}
if test.apiServerNodes != nil {
nodeManager.SetNodeGetter(nodeGetter)
}
node, err := nodeManager.GetNode("node1")
if test.expectNotFound && err != vclib.ErrNoVMFound {
t.Errorf("Expected NotFound error, got: %v", err)
}
if !test.expectNotFound && err != nil {
t.Errorf("Unexpected error: %s", err)
}
if test.expectedNodeAnnotation != "" {
if node.Annotations == nil {
t.Errorf("Expected node with annotation %q, got nil", test.expectedNodeAnnotation)
} else {
if ann := node.Annotations[nodeAnnotation]; ann != test.expectedNodeAnnotation {
t.Errorf("Expected node with annotation %q, got %q", test.expectedNodeAnnotation, ann)
}
}
}
})
}
}

View File

@ -276,6 +276,7 @@ func init() {
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
vs.kubeClient = clientBuilder.ClientOrDie("vsphere-legacy-cloud-provider")
vs.nodeManager.SetNodeGetter(vs.kubeClient.CoreV1())
}
// Initialize Node Informers
@ -318,6 +319,9 @@ func (vs *VSphere) SetInformers(informerFactory informers.SharedInformerFactory)
cache.ResourceEventHandlerFuncs{UpdateFunc: vs.syncNodeZoneLabels},
zoneLabelsResyncPeriod,
)
nodeLister := informerFactory.Core().V1().Nodes().Lister()
vs.nodeManager.SetNodeLister(nodeLister)
klog.V(4).Infof("Node informers in vSphere cloud provider initialized")
}