move pkg/util/node to component-helpers/node/util (#105347)

Signed-off-by: Neha Lohia <nehapithadiya444@gmail.com>
This commit is contained in:
Neha Lohia
2021-11-12 21:22:27 +05:30
committed by GitHub
parent 7b9f4f18fe
commit fa1b6765d5
41 changed files with 204 additions and 303 deletions

View File

@@ -34,7 +34,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
nodeutil "k8s.io/kubernetes/pkg/util/node"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/legacy-cloud-providers/gce"
"k8s.io/metrics/pkg/client/clientset/versioned/scheme"
)

View File

@@ -43,8 +43,8 @@ import (
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
cloudprovider "k8s.io/cloud-provider"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
utilnode "k8s.io/kubernetes/pkg/util/node"
nodeutil "k8s.io/component-helpers/node/util"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
utiltaints "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/legacy-cloud-providers/gce"
netutils "k8s.io/utils/net"
@@ -112,21 +112,21 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
AddFunc: controllerutil.CreateAddNodeHandler(ca.AllocateOrOccupyCIDR),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if newNode.Spec.PodCIDR == "" {
return ca.AllocateOrOccupyCIDR(newNode)
}
// Even if PodCIDR is assigned, but NetworkUnavailable condition is
// set to true, we need to process the node to set the condition.
networkUnavailableTaint := &v1.Taint{Key: v1.TaintNodeNetworkUnavailable, Effect: v1.TaintEffectNoSchedule}
_, cond := nodeutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable)
_, cond := controllerutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable)
if cond == nil || cond.Status != v1.ConditionFalse || utiltaints.TaintExists(newNode.Spec.Taints, networkUnavailableTaint) {
return ca.AllocateOrOccupyCIDR(newNode)
}
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(ca.ReleaseCIDR),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(ca.ReleaseCIDR),
})
klog.V(0).Infof("Using cloud CIDR allocator (provider: %v)", cloud.ProviderName())
@@ -258,11 +258,11 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
cidrStrings, err := ca.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
if err != nil {
nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to get cidr(s) from provider: %v", err)
}
if len(cidrStrings) == 0 {
nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
}
//Can have at most 2 ips (one for v4 and one for v6)
@@ -290,19 +290,19 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
}
for i := 0; i < cidrUpdateRetries; i++ {
if err = utilnode.PatchNodeCIDRs(ca.client, types.NodeName(node.Name), cidrStrings); err == nil {
if err = nodeutil.PatchNodeCIDRs(ca.client, types.NodeName(node.Name), cidrStrings); err == nil {
klog.InfoS("Set the node PodCIDRs", "nodeName", node.Name, "cidrStrings", cidrStrings)
break
}
}
}
if err != nil {
nodeutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
controllerutil.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
klog.ErrorS(err, "Failed to update the node PodCIDR after multiple attempts", "nodeName", node.Name, "cidrStrings", cidrStrings)
return err
}
err = utilnode.SetNodeCondition(ca.client, types.NodeName(node.Name), v1.NodeCondition{
err = nodeutil.SetNodeCondition(ca.client, types.NodeName(node.Name), v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionFalse,
Reason: "RouteCreated",

View File

@@ -35,7 +35,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/legacy-cloud-providers/gce"
)
@@ -142,9 +142,9 @@ func (c *Controller) Start(nodeInformer informers.NodeInformer) error {
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(c.onAdd),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(c.onUpdate),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(c.onDelete),
AddFunc: controllerutil.CreateAddNodeHandler(c.onAdd),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(c.onUpdate),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(c.onDelete),
})
return nil

View File

@@ -36,9 +36,9 @@ import (
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
utilnode "k8s.io/kubernetes/pkg/util/node"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
)
// cidrs are reserved, then node resource is patched with them
@@ -135,8 +135,8 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
AddFunc: controllerutil.CreateAddNodeHandler(ra.AllocateOrOccupyCIDR),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
// If the PodCIDRs list is not empty we either:
// - already processed a Node that already had CIDRs after NC restarted
// (cidr is marked as used),
@@ -161,7 +161,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, nodeInformer informers.No
}
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(ra.ReleaseCIDR),
DeleteFunc: controllerutil.CreateDeleteNodeHandler(ra.ReleaseCIDR),
})
return ra, nil
@@ -268,7 +268,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
podCIDR, err := r.cidrSets[idx].AllocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err)
}
allocated.allocatedCIDRs[idx] = podCIDR
@@ -370,14 +370,14 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
for i := 0; i < cidrUpdateRetries; i++ {
if err = utilnode.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
klog.Infof("Set node %v PodCIDR to %v", node.Name, cidrsString)
return nil
}
}
// failed release back to the pool
klog.Errorf("Failed to update node %v PodCIDR to %v after multiple attempts: %v", node.Name, cidrsString, err)
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
controllerutil.RecordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
// We accept the fact that we may leak CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.

View File

@@ -51,11 +51,11 @@ import (
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
utilnode "k8s.io/component-helpers/node/topology"
nodetopology "k8s.io/component-helpers/node/topology"
kubeletapis "k8s.io/kubelet/pkg/apis"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
taintutils "k8s.io/kubernetes/pkg/util/taints"
)
@@ -487,15 +487,15 @@ func NewNodeLifecycleController(
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(ctx, kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
nc.taintManager.NodeUpdated(oldNode, newNode)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(node, nil)
return nil
}),
@@ -504,16 +504,16 @@ func NewNodeLifecycleController(
klog.Infof("Controller will reconcile labels.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.nodeUpdateQueue.Add(node.Name)
nc.nodeEvictionMap.registerNode(node.Name)
return nil
}),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
nc.nodeUpdateQueue.Add(newNode.Name)
return nil
}),
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
DeleteFunc: controllerutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
nc.nodesToRetry.Delete(node.Name)
nc.nodeEvictionMap.unregisterNode(node.Name)
return nil
@@ -657,7 +657,7 @@ func (nc *Controller) doNoScheduleTaintingPass(ctx context.Context, nodeName str
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !nodeutil.SwapNodeControllerTaint(ctx, nc.kubeClient, taintsToAdd, taintsToDel, node) {
if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
@@ -678,7 +678,7 @@ func (nc *Controller) doNoExecuteTaintingPass(ctx context.Context) {
// retry in 50 millisecond
return false, 50 * time.Millisecond
}
_, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
_, condition := controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
// Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
taintToAdd := v1.Taint{}
oppositeTaint := v1.Taint{}
@@ -694,11 +694,10 @@ func (nc *Controller) doNoExecuteTaintingPass(ctx context.Context) {
klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
return true, 0
}
result := nodeutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
result := controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
if result {
//count the evictionsNumber
zone := utilnode.GetZoneKey(node)
zone := nodetopology.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
@@ -725,7 +724,7 @@ func (nc *Controller) doEvictionPass(ctx context.Context) {
utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err))
return false, 0
}
remaining, err := nodeutil.DeletePods(ctx, nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
remaining, err := controllerutil.DeletePods(ctx, nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil {
// We are not setting eviction status here.
// New pods will be handled by zonePodEvictor retry
@@ -741,7 +740,7 @@ func (nc *Controller) doEvictionPass(ctx context.Context) {
}
if node != nil {
zone := utilnode.GetZoneKey(node)
zone := nodetopology.GetZoneKey(node)
evictionsNumber.WithLabelValues(zone).Inc()
}
@@ -768,7 +767,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
for i := range added {
klog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
controllerutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
nc.knownNodeSet[added[i].Name] = added[i]
nc.addPodEvictorForNewZone(added[i])
if nc.runTaintManager {
@@ -780,7 +779,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
for i := range deleted {
klog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
controllerutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
delete(nc.knownNodeSet, deleted[i].Name)
}
@@ -810,7 +809,7 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
// Some nodes may be excluded from disruption checking
if !isNodeExcludedFromDisruptionChecks(node) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
zoneToNodeConditions[nodetopology.GetZoneKey(node)] = append(zoneToNodeConditions[nodetopology.GetZoneKey(node)], currentReadyCondition)
}
if currentReadyCondition != nil {
@@ -837,10 +836,10 @@ func (nc *Controller) monitorNodeHealth(ctx context.Context) error {
switch {
case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
// Report node event only once when status changed.
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
controllerutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
fallthrough
case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
if err = nodeutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil {
if err = controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, node.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
nc.nodesToRetry.Store(node.Name, struct{}{})
continue
@@ -862,7 +861,7 @@ func (nc *Controller) processTaintBaseEviction(ctx context.Context, node *v1.Nod
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !nodeutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node, v1.ConditionFalse) {
@@ -875,7 +874,7 @@ func (nc *Controller) processTaintBaseEviction(ctx context.Context, node *v1.Nod
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !nodeutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
if !controllerutil.SwapNodeControllerTaint(ctx, nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
klog.Errorf("Failed to instantly swap NotReadyTaint to UnreachableTaint. Will try again in the next cycle.")
}
} else if nc.markNodeForTainting(node, v1.ConditionUnknown) {
@@ -962,7 +961,7 @@ func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (t
var gracePeriod time.Duration
var observedReadyCondition v1.NodeCondition
_, currentReadyCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
_, currentReadyCondition := controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition == nil {
// If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
// A fake ready condition is created, where LastHeartbeatTime and LastTransitionTime is set
@@ -1005,7 +1004,7 @@ func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (t
var savedCondition *v1.NodeCondition
var savedLease *coordv1.Lease
if nodeHealth != nil {
_, savedCondition = nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
_, savedCondition = controllerutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
savedLease = nodeHealth.lease
}
@@ -1077,7 +1076,7 @@ func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (t
nowTimestamp := nc.now()
for _, nodeConditionType := range nodeConditionTypes {
_, currentCondition := nodeutil.GetNodeCondition(&node.Status, nodeConditionType)
_, currentCondition := controllerutil.GetNodeCondition(&node.Status, nodeConditionType)
if currentCondition == nil {
klog.V(2).Infof("Condition %v of node %v was never updated by kubelet", nodeConditionType, node.Name)
node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
@@ -1100,7 +1099,7 @@ func (nc *Controller) tryUpdateNodeHealth(ctx context.Context, node *v1.Node) (t
}
}
// We need to update currentReadyCondition due to its value potentially changed.
_, currentReadyCondition = nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
_, currentReadyCondition = controllerutil.GetNodeCondition(&node.Status, v1.NodeReady)
if !apiequality.Semantic.DeepEqual(currentReadyCondition, &observedReadyCondition) {
if _, err := nc.kubeClient.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{}); err != nil {
@@ -1275,7 +1274,7 @@ func (nc *Controller) processPod(ctx context.Context, podItem podUpdateItem) {
return
}
_, currentReadyCondition := nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
_, currentReadyCondition := controllerutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
if currentReadyCondition == nil {
// Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted).
// In both cases, the pod will be handled correctly (evicted if needed) during processing
@@ -1295,7 +1294,7 @@ func (nc *Controller) processPod(ctx context.Context, podItem podUpdateItem) {
}
if currentReadyCondition.Status != v1.ConditionTrue {
if err := nodeutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, nodeName); err != nil {
if err := controllerutil.MarkPodsNotReady(ctx, nc.kubeClient, nc.recorder, pods, nodeName); err != nil {
klog.Warningf("Unable to mark pod %+v NotReady on node %v: %v.", podItem, nodeName, err)
nc.podUpdateQueue.AddRateLimited(podItem)
}
@@ -1339,7 +1338,7 @@ func (nc *Controller) classifyNodes(allNodes []*v1.Node) (added, deleted, newZon
added = append(added, allNodes[i])
} else {
// Currently, we only consider new zone as updated.
zone := utilnode.GetZoneKey(allNodes[i])
zone := nodetopology.GetZoneKey(allNodes[i])
if _, found := nc.zoneStates[zone]; !found {
newZoneRepresentatives = append(newZoneRepresentatives, allNodes[i])
}
@@ -1382,7 +1381,7 @@ func (nc *Controller) ReducedQPSFunc(nodeNum int) float32 {
func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
zone := utilnode.GetZoneKey(node)
zone := nodetopology.GetZoneKey(node)
if _, found := nc.zoneStates[zone]; !found {
nc.zoneStates[zone] = stateInitial
if !nc.runTaintManager {
@@ -1403,7 +1402,7 @@ func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *Controller) cancelPodEviction(node *v1.Node) bool {
zone := utilnode.GetZoneKey(node)
zone := nodetopology.GetZoneKey(node)
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
if !nc.nodeEvictionMap.setStatus(node.Name, unmarked) {
@@ -1429,7 +1428,7 @@ func (nc *Controller) evictPods(ctx context.Context, node *v1.Node, pods []*v1.P
if ok && status == evicted {
// Node eviction already happened for this node.
// Handling immediate pod deletion.
_, err := nodeutil.DeletePods(ctx, nc.kubeClient, pods, nc.recorder, node.Name, string(node.UID), nc.daemonSetStore)
_, err := controllerutil.DeletePods(ctx, nc.kubeClient, pods, nc.recorder, node.Name, string(node.UID), nc.daemonSetStore)
if err != nil {
return false, fmt.Errorf("unable to delete pods from node %q: %v", node.Name, err)
}
@@ -1438,7 +1437,7 @@ func (nc *Controller) evictPods(ctx context.Context, node *v1.Node, pods []*v1.P
if !nc.nodeEvictionMap.setStatus(node.Name, toBeEvicted) {
klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", node.Name)
}
return nc.zonePodEvictor[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID)), nil
return nc.zonePodEvictor[nodetopology.GetZoneKey(node)].Add(node.Name, string(node.UID)), nil
}
func (nc *Controller) markNodeForTainting(node *v1.Node, status v1.ConditionStatus) bool {
@@ -1446,17 +1445,17 @@ func (nc *Controller) markNodeForTainting(node *v1.Node, status v1.ConditionStat
defer nc.evictorLock.Unlock()
if status == v1.ConditionFalse {
if !taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name)
nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name)
}
}
if status == v1.ConditionUnknown {
if !taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name)
nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name)
}
}
return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Add(node.Name, string(node.UID))
return nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Add(node.Name, string(node.UID))
}
func (nc *Controller) markNodeAsReachable(ctx context.Context, node *v1.Node) (bool, error) {
@@ -1472,7 +1471,7 @@ func (nc *Controller) markNodeAsReachable(ctx context.Context, node *v1.Node) (b
klog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
return false, err
}
return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil
return nc.zoneNoExecuteTainter[nodetopology.GetZoneKey(node)].Remove(node.Name), nil
}
// ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.
@@ -1541,7 +1540,7 @@ func (nc *Controller) reconcileNodeLabels(nodeName string) error {
if len(labelsToUpdate) == 0 {
return nil
}
if !nodeutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
if !controllerutil.AddOrUpdateLabelsOnNode(nc.kubeClient, labelsToUpdate, node) {
return fmt.Errorf("failed update labels for node %+v", node)
}
return nil

View File

@@ -44,7 +44,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
"k8s.io/kubernetes/pkg/controller/testutil"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
"k8s.io/kubernetes/pkg/util/node"
taintutils "k8s.io/kubernetes/pkg/util/taints"
"k8s.io/utils/pointer"
@@ -95,7 +95,7 @@ func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNode
nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string)
pods, _ := nc.getPodsAssignedToNode(value.Value)
nodeutil.DeletePods(context.TODO(), fakeNodeHandler, pods, nc.recorder, value.Value, uid, nc.daemonSetStore)
controllerutil.DeletePods(context.TODO(), fakeNodeHandler, pods, nc.recorder, value.Value, uid, nc.daemonSetStore)
_ = nc.nodeEvictionMap.setStatus(value.Value, evicted)
return true, 0
})
@@ -729,7 +729,7 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
t.Logf("listed pods %d for node %v", len(pods), value.Value)
nodeutil.DeletePods(context.TODO(), item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
controllerutil.DeletePods(context.TODO(), item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
return true, 0
})
} else {
@@ -889,7 +889,7 @@ func TestPodStatusChange(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeutil.DeletePods(context.TODO(), item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
controllerutil.DeletePods(context.TODO(), item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
return true, 0
})
}
@@ -3810,7 +3810,7 @@ func TestTryUpdateNodeHealth(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
_, savedReadyCondition := nodeutil.GetNodeCondition(nodeController.nodeHealthMap.getDeepCopy(test.node.Name).status, v1.NodeReady)
_, savedReadyCondition := controllerutil.GetNodeCondition(nodeController.nodeHealthMap.getDeepCopy(test.node.Name).status, v1.NodeReady)
savedStatus := getStatus(savedReadyCondition)
currentStatus := getStatus(currentReadyCondition)
if !apiequality.Semantic.DeepEqual(currentStatus, savedStatus) {

View File

@@ -26,8 +26,8 @@ import (
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
// NodeStatusUpdater defines a set of operations for updating the