Add node check to vSphere cloud provider

At KCM startup, vSphere cloud provider builds a cache from NodeAdded events
from an informer. But these events are asynchronous, the cloud provider may
need information from the cache earlier, for example when detaching a
volume from a node at KCM startup. If a node is missing in the cache,
the cloud provider treats such a detach as successful, which is wrong.
Such a volume will be attached to the node forever.

To prevent this issue:

1. Try nodeLister before declaring a node as not found.
A/D controller starts after its node informer has been synced.

2. Read API server before declaring a node as not found.
Just in case the informer has stale data.
This commit is contained in:
Jan Safranek 2023-04-11 15:41:20 +02:00
parent afcc207feb
commit 8d19c0002b
3 changed files with 205 additions and 5 deletions

View File

@ -27,11 +27,14 @@ import (
"github.com/vmware/govmomi/object"
"github.com/vmware/govmomi/vim25/mo"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
coreclients "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
"k8s.io/legacy-cloud-providers/vsphere/vclib"
)
@ -61,6 +64,9 @@ type NodeManager struct {
//CredentialsManager
credentialManager *SecretCredentialManager
nodeLister corelisters.NodeLister
nodeGetter coreclients.NodesGetter
// Mutexes
registeredNodesLock sync.RWMutex
nodeInfoLock sync.RWMutex
@ -271,10 +277,43 @@ func (nm *NodeManager) GetNode(nodeName k8stypes.NodeName) (v1.Node, error) {
nm.registeredNodesLock.RLock()
node := nm.registeredNodes[convertToString(nodeName)]
nm.registeredNodesLock.RUnlock()
if node == nil {
return v1.Node{}, vclib.ErrNoVMFound
if node != nil {
klog.V(4).Infof("Node %s found in vSphere cloud provider cache", nodeName)
return *node, nil
}
return *node, nil
if nm.nodeLister != nil {
klog.V(4).Infof("Node %s missing in vSphere cloud provider cache, trying node informer")
node, err := nm.nodeLister.Get(convertToString(nodeName))
if err != nil {
if !errors.IsNotFound(err) {
return v1.Node{}, err
}
// Fall through with IsNotFound error and try to get the node from the API server
} else {
node := node.DeepCopy()
nm.addNode(node)
klog.V(4).Infof("Node %s found in vSphere cloud provider node informer", nodeName)
return *node, nil
}
}
if nm.nodeGetter != nil {
klog.V(4).Infof("Node %s missing in vSphere cloud provider caches, trying the API server")
node, err := nm.nodeGetter.Nodes().Get(context.TODO(), convertToString(nodeName), metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return v1.Node{}, err
}
// Fall through with IsNotFound error to keep the code consistent with the above
} else {
nm.addNode(node)
klog.V(4).Infof("Node %s found in the API server", nodeName)
return *node, nil
}
}
klog.V(4).Infof("Node %s not found in vSphere cloud provider", nodeName)
return v1.Node{}, vclib.ErrNoVMFound
}
func (nm *NodeManager) getNodes() map[string]*v1.Node {
@ -515,3 +554,11 @@ func (nm *NodeManager) GetHostsInZone(ctx context.Context, zoneFailureDomain str
klog.V(4).Infof("GetHostsInZone %v returning: %v", zoneFailureDomain, hosts)
return hosts, nil
}
func (nm *NodeManager) SetNodeLister(nodeLister corelisters.NodeLister) {
nm.nodeLister = nodeLister
}
func (nm *NodeManager) SetNodeGetter(nodeGetter coreclients.NodesGetter) {
nm.nodeGetter = nodeGetter
}

View File

@ -0,0 +1,149 @@
//go:build !providerless
// +build !providerless
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package vsphere
import (
"testing"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/legacy-cloud-providers/vsphere/vclib"
)
// Annotation used to distinguish nodes in node cache / informer / API server
const nodeAnnotation = "test"
func getNode(annotation string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
Annotations: map[string]string{
nodeAnnotation: annotation,
},
},
}
}
func TestGetNode(t *testing.T) {
tests := []struct {
name string
cachedNodes []*v1.Node
informerNodes []*v1.Node // "nil" means that the NodeManager has no nodeLister
apiServerNodes []*v1.Node // "nil" means that the NodeManager has no nodeGetter
expectedNodeAnnotation string
expectNotFound bool
}{
{
name: "No cached node anywhere",
cachedNodes: []*v1.Node{},
informerNodes: []*v1.Node{},
apiServerNodes: []*v1.Node{},
expectNotFound: true,
},
{
name: "No lister & getter",
cachedNodes: []*v1.Node{},
informerNodes: nil,
apiServerNodes: nil,
expectNotFound: true,
},
{
name: "cache is used first",
cachedNodes: []*v1.Node{getNode("cache")},
informerNodes: []*v1.Node{getNode("informer")},
apiServerNodes: []*v1.Node{getNode("apiserver")},
expectedNodeAnnotation: "cache",
},
{
name: "informer is used second",
cachedNodes: []*v1.Node{},
informerNodes: []*v1.Node{getNode("informer")},
apiServerNodes: []*v1.Node{getNode("apiserver")},
expectedNodeAnnotation: "informer",
},
{
name: "API server is used last",
cachedNodes: []*v1.Node{},
informerNodes: []*v1.Node{},
apiServerNodes: []*v1.Node{getNode("apiserver")},
expectedNodeAnnotation: "apiserver",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// local NodeManager cache
cache := make(map[string]*v1.Node)
for _, node := range test.cachedNodes {
cache[node.Name] = node
}
// Client with apiServerNodes
objs := []runtime.Object{}
for _, node := range test.apiServerNodes {
objs = append(objs, node)
}
client := fake.NewSimpleClientset(objs...)
nodeGetter := client.CoreV1()
// Informer + nodeLister. Despite the client already has apiServerNodes, they won't appear in the
// nodeLister, because the informer is never started.
factory := informers.NewSharedInformerFactory(client, 0 /* no resync */)
nodeInformer := factory.Core().V1().Nodes()
for _, node := range test.informerNodes {
nodeInformer.Informer().GetStore().Add(node)
}
nodeLister := nodeInformer.Lister()
nodeManager := NodeManager{
registeredNodes: cache,
}
if test.informerNodes != nil {
nodeManager.SetNodeLister(nodeLister)
}
if test.apiServerNodes != nil {
nodeManager.SetNodeGetter(nodeGetter)
}
node, err := nodeManager.GetNode("node1")
if test.expectNotFound && err != vclib.ErrNoVMFound {
t.Errorf("Expected NotFound error, got: %v", err)
}
if !test.expectNotFound && err != nil {
t.Errorf("Unexpected error: %s", err)
}
if test.expectedNodeAnnotation != "" {
if node.Annotations == nil {
t.Errorf("Expected node with annotation %q, got nil", test.expectedNodeAnnotation)
} else {
if ann := node.Annotations[nodeAnnotation]; ann != test.expectedNodeAnnotation {
t.Errorf("Expected node with annotation %q, got %q", test.expectedNodeAnnotation, ann)
}
}
}
})
}
}

View File

@ -276,6 +276,7 @@ func init() {
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) {
vs.kubeClient = clientBuilder.ClientOrDie("vsphere-legacy-cloud-provider")
vs.nodeManager.SetNodeGetter(vs.kubeClient.CoreV1())
}
// Initialize Node Informers
@ -318,6 +319,9 @@ func (vs *VSphere) SetInformers(informerFactory informers.SharedInformerFactory)
cache.ResourceEventHandlerFuncs{UpdateFunc: vs.syncNodeZoneLabels},
zoneLabelsResyncPeriod,
)
nodeLister := informerFactory.Core().V1().Nodes().Lister()
vs.nodeManager.SetNodeLister(nodeLister)
klog.V(4).Infof("Node informers in vSphere cloud provider initialized")
}