From 8d19c0002bb0b05744a21b997bf7ee3095024d25 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Tue, 11 Apr 2023 15:41:20 +0200 Subject: [PATCH] Add node check to vSphere cloud provider At KCM startup, vSphere cloud provider builds a cache from NodeAdded events from an informer. But these events are asynchronous, the cloud provider may need information from the cache earlier, for example when detaching a volume from a node at KCM startup. If a node is missing in the cache, the cloud provider treats such a detach as successful, which is wrong. Such a volume will be attached to the node forever. To prevent this issue: 1. Try nodeLister before declaring a node as not found. A/D controller starts after its node informer has been synced. 2. Read API server before declaring a node as not found. Just in case the informer has stale data. --- .../vsphere/nodemanager.go | 57 ++++++- .../vsphere/nodemanager_test.go | 149 ++++++++++++++++++ .../legacy-cloud-providers/vsphere/vsphere.go | 4 + 3 files changed, 205 insertions(+), 5 deletions(-) create mode 100644 staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager_test.go diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go index 0fe96a7b79d..5bd3eea6e36 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager.go @@ -27,11 +27,14 @@ import ( "github.com/vmware/govmomi/object" "github.com/vmware/govmomi/vim25/mo" - "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" + coreclients "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" cloudprovider "k8s.io/cloud-provider" + "k8s.io/klog/v2" "k8s.io/legacy-cloud-providers/vsphere/vclib" ) @@ -61,6 +64,9 @@ type NodeManager struct { //CredentialsManager credentialManager *SecretCredentialManager + nodeLister corelisters.NodeLister + nodeGetter coreclients.NodesGetter + // Mutexes registeredNodesLock sync.RWMutex nodeInfoLock sync.RWMutex @@ -271,10 +277,43 @@ func (nm *NodeManager) GetNode(nodeName k8stypes.NodeName) (v1.Node, error) { nm.registeredNodesLock.RLock() node := nm.registeredNodes[convertToString(nodeName)] nm.registeredNodesLock.RUnlock() - if node == nil { - return v1.Node{}, vclib.ErrNoVMFound + if node != nil { + klog.V(4).Infof("Node %s found in vSphere cloud provider cache", nodeName) + return *node, nil } - return *node, nil + + if nm.nodeLister != nil { + klog.V(4).Infof("Node %s missing in vSphere cloud provider cache, trying node informer") + node, err := nm.nodeLister.Get(convertToString(nodeName)) + if err != nil { + if !errors.IsNotFound(err) { + return v1.Node{}, err + } + // Fall through with IsNotFound error and try to get the node from the API server + } else { + node := node.DeepCopy() + nm.addNode(node) + klog.V(4).Infof("Node %s found in vSphere cloud provider node informer", nodeName) + return *node, nil + } + } + + if nm.nodeGetter != nil { + klog.V(4).Infof("Node %s missing in vSphere cloud provider caches, trying the API server") + node, err := nm.nodeGetter.Nodes().Get(context.TODO(), convertToString(nodeName), metav1.GetOptions{}) + if err != nil { + if !errors.IsNotFound(err) { + return v1.Node{}, err + } + // Fall through with IsNotFound error to keep the code consistent with the above + } else { + nm.addNode(node) + klog.V(4).Infof("Node %s found in the API server", nodeName) + return *node, nil + } + } + klog.V(4).Infof("Node %s not found in vSphere cloud provider", nodeName) + return v1.Node{}, vclib.ErrNoVMFound } func (nm *NodeManager) getNodes() map[string]*v1.Node { @@ -515,3 +554,11 @@ func (nm *NodeManager) GetHostsInZone(ctx context.Context, zoneFailureDomain str klog.V(4).Infof("GetHostsInZone %v returning: %v", zoneFailureDomain, hosts) return hosts, nil } + +func (nm *NodeManager) SetNodeLister(nodeLister corelisters.NodeLister) { + nm.nodeLister = nodeLister +} + +func (nm *NodeManager) SetNodeGetter(nodeGetter coreclients.NodesGetter) { + nm.nodeGetter = nodeGetter +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager_test.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager_test.go new file mode 100644 index 00000000000..f1cec129b47 --- /dev/null +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/nodemanager_test.go @@ -0,0 +1,149 @@ +//go:build !providerless +// +build !providerless + +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vsphere + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/legacy-cloud-providers/vsphere/vclib" +) + +// Annotation used to distinguish nodes in node cache / informer / API server +const nodeAnnotation = "test" + +func getNode(annotation string) *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Annotations: map[string]string{ + nodeAnnotation: annotation, + }, + }, + } +} + +func TestGetNode(t *testing.T) { + tests := []struct { + name string + cachedNodes []*v1.Node + informerNodes []*v1.Node // "nil" means that the NodeManager has no nodeLister + apiServerNodes []*v1.Node // "nil" means that the NodeManager has no nodeGetter + + expectedNodeAnnotation string + expectNotFound bool + }{ + { + name: "No cached node anywhere", + cachedNodes: []*v1.Node{}, + informerNodes: []*v1.Node{}, + apiServerNodes: []*v1.Node{}, + expectNotFound: true, + }, + { + name: "No lister & getter", + cachedNodes: []*v1.Node{}, + informerNodes: nil, + apiServerNodes: nil, + expectNotFound: true, + }, + { + name: "cache is used first", + cachedNodes: []*v1.Node{getNode("cache")}, + informerNodes: []*v1.Node{getNode("informer")}, + apiServerNodes: []*v1.Node{getNode("apiserver")}, + expectedNodeAnnotation: "cache", + }, + { + name: "informer is used second", + cachedNodes: []*v1.Node{}, + informerNodes: []*v1.Node{getNode("informer")}, + apiServerNodes: []*v1.Node{getNode("apiserver")}, + expectedNodeAnnotation: "informer", + }, + { + name: "API server is used last", + cachedNodes: []*v1.Node{}, + informerNodes: []*v1.Node{}, + apiServerNodes: []*v1.Node{getNode("apiserver")}, + expectedNodeAnnotation: "apiserver", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + // local NodeManager cache + cache := make(map[string]*v1.Node) + for _, node := range test.cachedNodes { + cache[node.Name] = node + } + + // Client with apiServerNodes + objs := []runtime.Object{} + for _, node := range test.apiServerNodes { + objs = append(objs, node) + } + client := fake.NewSimpleClientset(objs...) + nodeGetter := client.CoreV1() + + // Informer + nodeLister. Despite the client already has apiServerNodes, they won't appear in the + // nodeLister, because the informer is never started. + factory := informers.NewSharedInformerFactory(client, 0 /* no resync */) + nodeInformer := factory.Core().V1().Nodes() + for _, node := range test.informerNodes { + nodeInformer.Informer().GetStore().Add(node) + } + nodeLister := nodeInformer.Lister() + + nodeManager := NodeManager{ + registeredNodes: cache, + } + if test.informerNodes != nil { + nodeManager.SetNodeLister(nodeLister) + } + if test.apiServerNodes != nil { + nodeManager.SetNodeGetter(nodeGetter) + } + + node, err := nodeManager.GetNode("node1") + if test.expectNotFound && err != vclib.ErrNoVMFound { + t.Errorf("Expected NotFound error, got: %v", err) + } + if !test.expectNotFound && err != nil { + t.Errorf("Unexpected error: %s", err) + } + + if test.expectedNodeAnnotation != "" { + if node.Annotations == nil { + t.Errorf("Expected node with annotation %q, got nil", test.expectedNodeAnnotation) + } else { + if ann := node.Annotations[nodeAnnotation]; ann != test.expectedNodeAnnotation { + t.Errorf("Expected node with annotation %q, got %q", test.expectedNodeAnnotation, ann) + } + } + } + }) + } +} diff --git a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go index 756c3f6c999..615094db661 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go +++ b/staging/src/k8s.io/legacy-cloud-providers/vsphere/vsphere.go @@ -276,6 +276,7 @@ func init() { // Initialize passes a Kubernetes clientBuilder interface to the cloud provider func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) { vs.kubeClient = clientBuilder.ClientOrDie("vsphere-legacy-cloud-provider") + vs.nodeManager.SetNodeGetter(vs.kubeClient.CoreV1()) } // Initialize Node Informers @@ -318,6 +319,9 @@ func (vs *VSphere) SetInformers(informerFactory informers.SharedInformerFactory) cache.ResourceEventHandlerFuncs{UpdateFunc: vs.syncNodeZoneLabels}, zoneLabelsResyncPeriod, ) + + nodeLister := informerFactory.Core().V1().Nodes().Lister() + vs.nodeManager.SetNodeLister(nodeLister) klog.V(4).Infof("Node informers in vSphere cloud provider initialized") }