mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #82489 from krzysied/node_controller_lock_map
Adding lock to node data map
This commit is contained in:
commit
06609b77e8
@ -169,6 +169,43 @@ type nodeHealthData struct {
|
||||
lease *coordv1beta1.Lease
|
||||
}
|
||||
|
||||
func (n *nodeHealthData) deepCopy() *nodeHealthData {
|
||||
if n == nil {
|
||||
return nil
|
||||
}
|
||||
return &nodeHealthData{
|
||||
probeTimestamp: n.probeTimestamp,
|
||||
readyTransitionTimestamp: n.readyTransitionTimestamp,
|
||||
status: n.status.DeepCopy(),
|
||||
lease: n.lease.DeepCopy(),
|
||||
}
|
||||
}
|
||||
|
||||
type nodeHealthMap struct {
|
||||
lock sync.RWMutex
|
||||
nodeHealths map[string]*nodeHealthData
|
||||
}
|
||||
|
||||
func newNodeHealthMap() *nodeHealthMap {
|
||||
return &nodeHealthMap{
|
||||
nodeHealths: make(map[string]*nodeHealthData),
|
||||
}
|
||||
}
|
||||
|
||||
// getDeepCopy - returns copy of node health data.
|
||||
// It prevents data being changed after retrieving it from the map.
|
||||
func (n *nodeHealthMap) getDeepCopy(name string) *nodeHealthData {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
return n.nodeHealths[name].deepCopy()
|
||||
}
|
||||
|
||||
func (n *nodeHealthMap) set(name string, data *nodeHealthData) {
|
||||
n.lock.Lock()
|
||||
defer n.lock.Unlock()
|
||||
n.nodeHealths[name] = data
|
||||
}
|
||||
|
||||
// Controller is the controller that manages node's life cycle.
|
||||
type Controller struct {
|
||||
taintManager *scheduler.NoExecuteTaintManager
|
||||
@ -186,7 +223,7 @@ type Controller struct {
|
||||
|
||||
knownNodeSet map[string]*v1.Node
|
||||
// per Node map storing last observed health together with a local time when it was observed.
|
||||
nodeHealthMap map[string]*nodeHealthData
|
||||
nodeHealthMap *nodeHealthMap
|
||||
|
||||
// Lock to access evictor workers
|
||||
evictorLock sync.Mutex
|
||||
@ -305,7 +342,7 @@ func NewNodeLifecycleController(
|
||||
kubeClient: kubeClient,
|
||||
now: metav1.Now,
|
||||
knownNodeSet: make(map[string]*v1.Node),
|
||||
nodeHealthMap: make(map[string]*nodeHealthData),
|
||||
nodeHealthMap: newNodeHealthMap(),
|
||||
recorder: recorder,
|
||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
||||
@ -722,6 +759,11 @@ func (nc *Controller) monitorNodeHealth() error {
|
||||
}
|
||||
|
||||
decisionTimestamp := nc.now()
|
||||
nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
|
||||
if nodeHealthData == nil {
|
||||
klog.Errorf("Skipping %v node processing: health data doesn't exist.", node.Name)
|
||||
continue
|
||||
}
|
||||
if currentReadyCondition != nil {
|
||||
// Check eviction timeout against decisionTimestamp
|
||||
switch observedReadyCondition.Status {
|
||||
@ -740,12 +782,12 @@ func (nc *Controller) monitorNodeHealth() error {
|
||||
)
|
||||
}
|
||||
} else {
|
||||
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
|
||||
if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
|
||||
if nc.evictPods(node) {
|
||||
klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
|
||||
node.Name,
|
||||
decisionTimestamp,
|
||||
nc.nodeHealthMap[node.Name].readyTransitionTimestamp,
|
||||
nodeHealthData.readyTransitionTimestamp,
|
||||
nc.podEvictionTimeout,
|
||||
)
|
||||
}
|
||||
@ -766,12 +808,12 @@ func (nc *Controller) monitorNodeHealth() error {
|
||||
)
|
||||
}
|
||||
} else {
|
||||
if decisionTimestamp.After(nc.nodeHealthMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
|
||||
if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
|
||||
if nc.evictPods(node) {
|
||||
klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
|
||||
node.Name,
|
||||
decisionTimestamp,
|
||||
nc.nodeHealthMap[node.Name].readyTransitionTimestamp,
|
||||
nodeHealthData.readyTransitionTimestamp,
|
||||
nc.podEvictionTimeout-gracePeriod,
|
||||
)
|
||||
}
|
||||
@ -849,6 +891,11 @@ func legacyIsMasterNode(nodeName string) bool {
|
||||
// tryUpdateNodeHealth checks a given node's conditions and tries to update it. Returns grace period to
|
||||
// which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
|
||||
func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {
|
||||
nodeHealth := nc.nodeHealthMap.getDeepCopy(node.Name)
|
||||
defer func() {
|
||||
nc.nodeHealthMap.set(node.Name, nodeHealth)
|
||||
}()
|
||||
|
||||
var gracePeriod time.Duration
|
||||
var observedReadyCondition v1.NodeCondition
|
||||
_, currentReadyCondition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
|
||||
@ -863,10 +910,10 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
LastTransitionTime: node.CreationTimestamp,
|
||||
}
|
||||
gracePeriod = nc.nodeStartupGracePeriod
|
||||
if _, found := nc.nodeHealthMap[node.Name]; found {
|
||||
nc.nodeHealthMap[node.Name].status = &node.Status
|
||||
if nodeHealth != nil {
|
||||
nodeHealth.status = &node.Status
|
||||
} else {
|
||||
nc.nodeHealthMap[node.Name] = &nodeHealthData{
|
||||
nodeHealth = &nodeHealthData{
|
||||
status: &node.Status,
|
||||
probeTimestamp: node.CreationTimestamp,
|
||||
readyTransitionTimestamp: node.CreationTimestamp,
|
||||
@ -877,7 +924,6 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
observedReadyCondition = *currentReadyCondition
|
||||
gracePeriod = nc.nodeMonitorGracePeriod
|
||||
}
|
||||
savedNodeHealth, found := nc.nodeHealthMap[node.Name]
|
||||
// There are following cases to check:
|
||||
// - both saved and new status have no Ready Condition set - we leave everything as it is,
|
||||
// - saved status have no Ready Condition, but current one does - Controller was restarted with Node data already present in etcd,
|
||||
@ -894,21 +940,21 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
// if that's the case, but it does not seem necessary.
|
||||
var savedCondition *v1.NodeCondition
|
||||
var savedLease *coordv1beta1.Lease
|
||||
if found {
|
||||
_, savedCondition = nodeutil.GetNodeCondition(savedNodeHealth.status, v1.NodeReady)
|
||||
savedLease = savedNodeHealth.lease
|
||||
if nodeHealth != nil {
|
||||
_, savedCondition = nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
|
||||
savedLease = nodeHealth.lease
|
||||
}
|
||||
|
||||
if !found {
|
||||
if nodeHealth == nil {
|
||||
klog.Warningf("Missing timestamp for Node %s. Assuming now as a timestamp.", node.Name)
|
||||
savedNodeHealth = &nodeHealthData{
|
||||
nodeHealth = &nodeHealthData{
|
||||
status: &node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
}
|
||||
} else if savedCondition == nil && currentReadyCondition != nil {
|
||||
klog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
|
||||
savedNodeHealth = &nodeHealthData{
|
||||
nodeHealth = &nodeHealthData{
|
||||
status: &node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
@ -916,7 +962,7 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
} else if savedCondition != nil && currentReadyCondition == nil {
|
||||
klog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
|
||||
// TODO: figure out what to do in this case. For now we do the same thing as above.
|
||||
savedNodeHealth = &nodeHealthData{
|
||||
nodeHealth = &nodeHealthData{
|
||||
status: &node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
@ -929,14 +975,14 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
klog.V(3).Infof("ReadyCondition for Node %s transitioned from %v to %v", node.Name, savedCondition, currentReadyCondition)
|
||||
transitionTime = nc.now()
|
||||
} else {
|
||||
transitionTime = savedNodeHealth.readyTransitionTimestamp
|
||||
transitionTime = nodeHealth.readyTransitionTimestamp
|
||||
}
|
||||
if klog.V(5) {
|
||||
klog.Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, savedNodeHealth.status, node.Status)
|
||||
klog.Infof("Node %s ReadyCondition updated. Updating timestamp: %+v vs %+v.", node.Name, nodeHealth.status, node.Status)
|
||||
} else {
|
||||
klog.V(3).Infof("Node %s ReadyCondition updated. Updating timestamp.", node.Name)
|
||||
}
|
||||
savedNodeHealth = &nodeHealthData{
|
||||
nodeHealth = &nodeHealthData{
|
||||
status: &node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: transitionTime,
|
||||
@ -950,13 +996,12 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
// take no action.
|
||||
observedLease, _ = nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)
|
||||
if observedLease != nil && (savedLease == nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
|
||||
savedNodeHealth.lease = observedLease
|
||||
savedNodeHealth.probeTimestamp = nc.now()
|
||||
nodeHealth.lease = observedLease
|
||||
nodeHealth.probeTimestamp = nc.now()
|
||||
}
|
||||
}
|
||||
nc.nodeHealthMap[node.Name] = savedNodeHealth
|
||||
|
||||
if nc.now().After(savedNodeHealth.probeTimestamp.Add(gracePeriod)) {
|
||||
if nc.now().After(nodeHealth.probeTimestamp.Add(gracePeriod)) {
|
||||
// NodeReady condition or lease was last set longer ago than gracePeriod, so
|
||||
// update it to Unknown (regardless of its current value) in the master.
|
||||
|
||||
@ -984,7 +1029,7 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
})
|
||||
} else {
|
||||
klog.V(4).Infof("node %v hasn't been updated for %+v. Last %v is: %+v",
|
||||
node.Name, nc.now().Time.Sub(savedNodeHealth.probeTimestamp.Time), nodeConditionType, currentCondition)
|
||||
node.Name, nc.now().Time.Sub(nodeHealth.probeTimestamp.Time), nodeConditionType, currentCondition)
|
||||
if currentCondition.Status != v1.ConditionUnknown {
|
||||
currentCondition.Status = v1.ConditionUnknown
|
||||
currentCondition.Reason = "NodeStatusUnknown"
|
||||
@ -1001,9 +1046,9 @@ func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.Node
|
||||
klog.Errorf("Error updating node %s: %v", node.Name, err)
|
||||
return gracePeriod, observedReadyCondition, currentReadyCondition, err
|
||||
}
|
||||
nc.nodeHealthMap[node.Name] = &nodeHealthData{
|
||||
nodeHealth = &nodeHealthData{
|
||||
status: &node.Status,
|
||||
probeTimestamp: nc.nodeHealthMap[node.Name].probeTimestamp,
|
||||
probeTimestamp: nodeHealth.probeTimestamp,
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
lease: observedLease,
|
||||
}
|
||||
@ -1086,10 +1131,10 @@ func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.Nod
|
||||
// When exiting disruption mode update probe timestamps on all Nodes.
|
||||
now := nc.now()
|
||||
for i := range nodes {
|
||||
v := nc.nodeHealthMap[nodes[i].Name]
|
||||
v := nc.nodeHealthMap.getDeepCopy(nodes[i].Name)
|
||||
v.probeTimestamp = now
|
||||
v.readyTransitionTimestamp = now
|
||||
nc.nodeHealthMap[nodes[i].Name] = v
|
||||
nc.nodeHealthMap.set(nodes[i].Name, v)
|
||||
}
|
||||
// We reset all rate limiters to settings appropriate for the given state.
|
||||
for k := range nc.zoneStates {
|
||||
|
@ -3252,19 +3252,19 @@ func TestTryUpdateNodeHealth(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
nodeController.nodeHealthMap[test.node.Name] = &nodeHealthData{
|
||||
nodeController.nodeHealthMap.set(test.node.Name, &nodeHealthData{
|
||||
status: &test.node.Status,
|
||||
probeTimestamp: test.node.CreationTimestamp,
|
||||
readyTransitionTimestamp: test.node.CreationTimestamp,
|
||||
}
|
||||
})
|
||||
_, _, currentReadyCondition, err := nodeController.tryUpdateNodeHealth(test.node)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
_, savedReadyCondition := nodeutil.GetNodeCondition(nodeController.nodeHealthMap[test.node.Name].status, v1.NodeReady)
|
||||
_, savedReadyCondition := nodeutil.GetNodeCondition(nodeController.nodeHealthMap.getDeepCopy(test.node.Name).status, v1.NodeReady)
|
||||
savedStatus := getStatus(savedReadyCondition)
|
||||
currentStatus := getStatus(currentReadyCondition)
|
||||
if currentStatus != savedStatus {
|
||||
if !apiequality.Semantic.DeepEqual(currentStatus, savedStatus) {
|
||||
t.Errorf("expected %v, got %v", savedStatus, currentStatus)
|
||||
}
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user