mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 13:02:14 +00:00
Commonize GetZone with GetZoneByProviderID, retries for node update in case of conflicts
This commit is contained in:
parent
bae7a214f6
commit
89b3887aea
@ -24,6 +24,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
"net"
|
"net"
|
||||||
@ -65,6 +66,7 @@ import (
|
|||||||
const (
|
const (
|
||||||
ProviderName = "vsphere"
|
ProviderName = "vsphere"
|
||||||
providerIDPrefix = "vsphere://"
|
providerIDPrefix = "vsphere://"
|
||||||
|
updateNodeRetryCount = 3
|
||||||
VolDir = "kubevols"
|
VolDir = "kubevols"
|
||||||
RoundTripperDefaultCount = 3
|
RoundTripperDefaultCount = 3
|
||||||
DummyVMPrefixName = "vsphere-k8s"
|
DummyVMPrefixName = "vsphere-k8s"
|
||||||
@ -1535,10 +1537,10 @@ func (vs *VSphere) NodeAdded(obj interface{}) {
|
|||||||
if err := vs.nodeManager.RegisterNode(node); err != nil {
|
if err := vs.nodeManager.RegisterNode(node); err != nil {
|
||||||
klog.Errorf("failed to add node %+v: %v", node, err)
|
klog.Errorf("failed to add node %+v: %v", node, err)
|
||||||
}
|
}
|
||||||
vs.reconcileZonesForNode(node)
|
vs.setNodeZoneLabels(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (vs *VSphere) reconcileZonesForNode(node *v1.Node) {
|
func (vs *VSphere) setNodeZoneLabels(node *v1.Node) {
|
||||||
nodeZone := node.ObjectMeta.Labels[v1.LabelTopologyZone]
|
nodeZone := node.ObjectMeta.Labels[v1.LabelTopologyZone]
|
||||||
nodeRegion := node.ObjectMeta.Labels[v1.LabelTopologyRegion]
|
nodeRegion := node.ObjectMeta.Labels[v1.LabelTopologyRegion]
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@ -1558,16 +1560,30 @@ func (vs *VSphere) reconcileZonesForNode(node *v1.Node) {
|
|||||||
labels[v1.LabelTopologyZone] = zone.FailureDomain
|
labels[v1.LabelTopologyZone] = zone.FailureDomain
|
||||||
labels[v1.LabelTopologyRegion] = zone.Region
|
labels[v1.LabelTopologyRegion] = zone.Region
|
||||||
|
|
||||||
updatedNode, err = vs.kubeClient.CoreV1().Nodes().Update(ctx, updatedNode, metav1.UpdateOptions{})
|
err = tryUpdateNode(ctx, vs.kubeClient, updatedNode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("vSphere cloud provider can not update node with zones info: %v", err)
|
klog.Errorf("vSphere cloud provider can not update node with zones info: %v", err)
|
||||||
} else {
|
} else {
|
||||||
klog.V(3).Infof("Node %s updated with zone and region lables", node.Name)
|
klog.V(4).Infof("Node %s updated with zone and region labels", updatedNode.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tryUpdateNode(ctx context.Context, client clientset.Interface, updatedNode *v1.Node) error {
|
||||||
|
for i := 0; i < updateNodeRetryCount; i++ {
|
||||||
|
_, err := client.CoreV1().Nodes().Update(ctx, updatedNode, metav1.UpdateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if !apierrors.IsConflict(err) {
|
||||||
|
return fmt.Errorf("vSphere cloud provider can not update node with zones info: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("update node exceeds retry count")
|
||||||
|
}
|
||||||
|
|
||||||
// Notification handler when node is removed from k8s cluster.
|
// Notification handler when node is removed from k8s cluster.
|
||||||
func (vs *VSphere) NodeDeleted(obj interface{}) {
|
func (vs *VSphere) NodeDeleted(obj interface{}) {
|
||||||
node, ok := obj.(*v1.Node)
|
node, ok := obj.(*v1.Node)
|
||||||
@ -1661,14 +1677,9 @@ func withTagsClient(ctx context.Context, connection *vclib.VSphereConnection, f
|
|||||||
return f(c)
|
return f(c)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetZone implements Zones.GetZone
|
func (vs *VSphere) getZoneByVmUUIDAndNodeName(ctx context.Context, vmUUID string, nodeName k8stypes.NodeName) (cloudprovider.Zone, error) {
|
||||||
func (vs *VSphere) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
|
||||||
nodeName, err := vs.CurrentNodeName(ctx, vs.hostName)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Cannot get node name.")
|
|
||||||
return cloudprovider.Zone{}, err
|
|
||||||
}
|
|
||||||
zone := cloudprovider.Zone{}
|
zone := cloudprovider.Zone{}
|
||||||
|
|
||||||
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
|
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Cannot connect to vsphere. Get zone for node %s error", nodeName)
|
klog.Errorf("Cannot connect to vsphere. Get zone for node %s error", nodeName)
|
||||||
@ -1679,7 +1690,7 @@ func (vs *VSphere) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
|||||||
klog.Errorf("Cannot connect to datacenter. Get zone for node %s error", nodeName)
|
klog.Errorf("Cannot connect to datacenter. Get zone for node %s error", nodeName)
|
||||||
return cloudprovider.Zone{}, err
|
return cloudprovider.Zone{}, err
|
||||||
}
|
}
|
||||||
vmHost, err := dc.GetHostByVMUUID(ctx, vs.vmUUID)
|
vmHost, err := dc.GetHostByVMUUID(ctx, vmUUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Cannot find VM runtime host. Get zone for node %s error", nodeName)
|
klog.Errorf("Cannot find VM runtime host. Get zone for node %s error", nodeName)
|
||||||
return cloudprovider.Zone{}, err
|
return cloudprovider.Zone{}, err
|
||||||
@ -1697,100 +1708,12 @@ func (vs *VSphere) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
|||||||
// search the hierarchy, example order: ["Host", "Cluster", "Datacenter", "Folder"]
|
// search the hierarchy, example order: ["Host", "Cluster", "Datacenter", "Folder"]
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
obj := objects[len(objects)-1-i]
|
obj := objects[len(objects)-1-i]
|
||||||
tags, err := client.ListAttachedTags(ctx, obj)
|
attachedTags, err := client.ListAttachedTags(ctx, obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Cannot list attached tags. Get zone for node %s: %s", nodeName, err)
|
klog.Errorf("Cannot list attached tags. Get zone for node %s: %s", nodeName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, value := range tags {
|
for _, value := range attachedTags {
|
||||||
tag, err := client.GetTag(ctx, value)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Get tag %s: %s", value, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
category, err := client.GetCategory(ctx, tag.CategoryID)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Get category %s error", value)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
found := func() {
|
|
||||||
klog.Errorf("Found %q tag (%s) for %s attached to %s", category.Name, tag.Name, vs.vmUUID, obj.Reference())
|
|
||||||
}
|
|
||||||
switch {
|
|
||||||
case category.Name == vs.cfg.Labels.Zone:
|
|
||||||
zone.FailureDomain = tag.Name
|
|
||||||
found()
|
|
||||||
case category.Name == vs.cfg.Labels.Region:
|
|
||||||
zone.Region = tag.Name
|
|
||||||
found()
|
|
||||||
}
|
|
||||||
|
|
||||||
if zone.FailureDomain != "" && zone.Region != "" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if zone.Region == "" {
|
|
||||||
return fmt.Errorf("vSphere region category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Region, nodeName, vs.vmUUID)
|
|
||||||
}
|
|
||||||
if zone.FailureDomain == "" {
|
|
||||||
return fmt.Errorf("vSphere zone category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Zone, nodeName, vs.vmUUID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Get zone for node %s: %s", nodeName, err)
|
|
||||||
return cloudprovider.Zone{}, err
|
|
||||||
}
|
|
||||||
return zone, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (vs *VSphere) GetZoneByNodeName(ctx context.Context, nodeName k8stypes.NodeName) (cloudprovider.Zone, error) {
|
|
||||||
return cloudprovider.Zone{}, cloudprovider.NotImplemented
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO (dmoiseev) refactor, commonize with GetZone
|
|
||||||
func (vs *VSphere) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
|
|
||||||
zone := cloudprovider.Zone{}
|
|
||||||
vmUUID := strings.Replace(providerID, providerIDPrefix, "", 1)
|
|
||||||
|
|
||||||
vsi, err := vs.getVSphereInstanceForServer(vs.cfg.Workspace.VCenterIP, ctx)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Cannot connect to vsphere. Get zone for vm %s error", vmUUID)
|
|
||||||
return cloudprovider.Zone{}, err
|
|
||||||
}
|
|
||||||
dc, err := vclib.GetDatacenter(ctx, vsi.conn, vs.cfg.Workspace.Datacenter)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Cannot connect to datacenter. Get zone for vm %s error", vmUUID)
|
|
||||||
return cloudprovider.Zone{}, err
|
|
||||||
}
|
|
||||||
vmHost, err := dc.GetHostByVMUUID(ctx, vmUUID)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Cannot find VM runtime host. Get zone for vm %s error", vmUUID)
|
|
||||||
return cloudprovider.Zone{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pc := vsi.conn.Client.ServiceContent.PropertyCollector
|
|
||||||
err = withTagsClient(ctx, vsi.conn, func(c *rest.Client) error {
|
|
||||||
client := tags.NewManager(c)
|
|
||||||
// example result: ["Folder", "Datacenter", "Cluster", "Host"]
|
|
||||||
objects, err := mo.Ancestors(ctx, vsi.conn.Client, pc, *vmHost)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// search the hierarchy, example order: ["Host", "Cluster", "Datacenter", "Folder"]
|
|
||||||
for i := range objects {
|
|
||||||
obj := objects[len(objects)-1-i]
|
|
||||||
tags, err := client.ListAttachedTags(ctx, obj)
|
|
||||||
if err != nil {
|
|
||||||
klog.Errorf("Cannot list attached tags. Get zone for vm %s: %s", vmUUID, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for _, value := range tags {
|
|
||||||
tag, err := client.GetTag(ctx, value)
|
tag, err := client.GetTag(ctx, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Get tag %s: %s", value, err)
|
klog.Errorf("Get tag %s: %s", value, err)
|
||||||
@ -1821,22 +1744,49 @@ func (vs *VSphere) GetZoneByProviderID(ctx context.Context, providerID string) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
if zone.Region == "" {
|
if zone.Region == "" {
|
||||||
return fmt.Errorf("vSphere region category %q does not match any tags for vm %s", vs.cfg.Labels.Region, vmUUID)
|
return fmt.Errorf("vSphere region category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Region, nodeName, vmUUID)
|
||||||
}
|
}
|
||||||
if zone.FailureDomain == "" {
|
if zone.FailureDomain == "" {
|
||||||
return fmt.Errorf("vSphere zone category %q does not match any tags for vm %s", vs.cfg.Labels.Zone, vmUUID)
|
return fmt.Errorf("vSphere zone category %q does not match any tags for node %s [%s]", vs.cfg.Labels.Zone, nodeName, vmUUID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Get zone for vm %s: %s", vmUUID, err)
|
klog.Errorf("Get zone for node %s: %s", nodeName, err)
|
||||||
return cloudprovider.Zone{}, err
|
return cloudprovider.Zone{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return zone, nil
|
return zone, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetZone implements Zones.GetZone
|
||||||
|
func (vs *VSphere) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
|
||||||
|
nodeName, err := vs.CurrentNodeName(ctx, vs.hostName)
|
||||||
|
if err != nil {
|
||||||
|
klog.Errorf("Cannot get node name.")
|
||||||
|
return cloudprovider.Zone{}, err
|
||||||
|
}
|
||||||
|
return vs.getZoneByVmUUIDAndNodeName(ctx, vs.vmUUID, nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *VSphere) GetZoneByNodeName(ctx context.Context, nodeName k8stypes.NodeName) (cloudprovider.Zone, error) {
|
||||||
|
return cloudprovider.Zone{}, cloudprovider.NotImplemented
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *VSphere) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
|
||||||
|
var nodeName k8stypes.NodeName
|
||||||
|
vmUUID := strings.Replace(providerID, providerIDPrefix, "", 1)
|
||||||
|
|
||||||
|
for nName, nInfo := range vs.nodeManager.nodeInfoMap {
|
||||||
|
if nInfo.vmUUID == vmUUID {
|
||||||
|
nodeName = convertToK8sType(nName)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return vs.getZoneByVmUUIDAndNodeName(ctx, vmUUID, nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
// GetLabelsForVolume implements the PVLabeler interface for VSphere
|
// GetLabelsForVolume implements the PVLabeler interface for VSphere
|
||||||
// since this interface is used by the PV label admission controller.
|
// since this interface is used by the PV label admission controller.
|
||||||
func (vs *VSphere) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
|
func (vs *VSphere) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user