mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Review comments
This commit is contained in:
parent
8a62f1828d
commit
940cc2837c
@ -149,6 +149,18 @@ func (nc *NodeController) Run(period time.Duration) {
|
|||||||
}
|
}
|
||||||
}, nc.nodeMonitorPeriod, util.NeverStop)
|
}, nc.nodeMonitorPeriod, util.NeverStop)
|
||||||
|
|
||||||
|
// Managing eviction of nodes:
|
||||||
|
// 1. when we delete pods off a node, if the node was not empty at the time we then
|
||||||
|
// queue a termination watcher
|
||||||
|
// a. If we hit an error, retry deletion
|
||||||
|
// 2. The terminator loop ensures that pods are eventually cleaned and we never
|
||||||
|
// terminate a pod in a time period less than nc.maximumGracePeriod. AddedAt
|
||||||
|
// is the time from which we measure "has this pod been terminating too long",
|
||||||
|
// after which we will delete the pod with grace period 0 (force delete).
|
||||||
|
// a. If we hit errors, retry instantly
|
||||||
|
// b. If there are no pods left terminating, exit
|
||||||
|
// c. If there are pods still terminating, wait for their estimated completion
|
||||||
|
// before retrying
|
||||||
go util.Until(func() {
|
go util.Until(func() {
|
||||||
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
remaining, err := nc.deletePods(value.Value)
|
remaining, err := nc.deletePods(value.Value)
|
||||||
@ -167,19 +179,19 @@ func (nc *NodeController) Run(period time.Duration) {
|
|||||||
// in a particular time period
|
// in a particular time period
|
||||||
go util.Until(func() {
|
go util.Until(func() {
|
||||||
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
completed, remaining, err := nc.terminatePods(value.Value, value.Added)
|
completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
|
util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
|
||||||
return false, 0
|
return false, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
if completed {
|
if completed {
|
||||||
glog.V(2).Infof("All pods terminated on %s", value.Value)
|
glog.Infof("All pods terminated on %s", value.Value)
|
||||||
nc.recordNodeEvent(value.Value, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
|
nc.recordNodeEvent(value.Value, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
|
||||||
return true, 0
|
return true, 0
|
||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.Added, value.Value, remaining)
|
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
|
||||||
// clamp very short intervals
|
// clamp very short intervals
|
||||||
if remaining < nodeEvictionPeriod {
|
if remaining < nodeEvictionPeriod {
|
||||||
remaining = nodeEvictionPeriod
|
remaining = nodeEvictionPeriod
|
||||||
@ -228,6 +240,7 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
if !nc.knownNodeSet.Has(node.Name) {
|
if !nc.knownNodeSet.Has(node.Name) {
|
||||||
glog.V(1).Infof("NodeController observed a new Node: %#v", node)
|
glog.V(1).Infof("NodeController observed a new Node: %#v", node)
|
||||||
nc.recordNodeEvent(node.Name, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
|
nc.recordNodeEvent(node.Name, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
|
||||||
|
nc.cancelPodEviction(node.Name)
|
||||||
nc.knownNodeSet.Insert(node.Name)
|
nc.knownNodeSet.Insert(node.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -239,11 +252,11 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
observedSet.Insert(node.Name)
|
observedSet.Insert(node.Name)
|
||||||
}
|
}
|
||||||
deleted := nc.knownNodeSet.Difference(observedSet)
|
deleted := nc.knownNodeSet.Difference(observedSet)
|
||||||
for node := range deleted {
|
for nodeName := range deleted {
|
||||||
glog.V(1).Infof("NodeController observed a Node deletion: %v", node)
|
glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName)
|
||||||
nc.recordNodeEvent(node, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", node))
|
nc.recordNodeEvent(nodeName, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
|
||||||
nc.podEvictor.Add(node)
|
nc.evictPods(nodeName)
|
||||||
nc.knownNodeSet.Delete(node)
|
nc.knownNodeSet.Delete(nodeName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,21 +297,19 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
// Check eviction timeout against decisionTimestamp
|
// Check eviction timeout against decisionTimestamp
|
||||||
if lastReadyCondition.Status == api.ConditionFalse &&
|
if lastReadyCondition.Status == api.ConditionFalse &&
|
||||||
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
|
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
|
||||||
if nc.podEvictor.Add(node.Name) {
|
if nc.evictPods(node.Name) {
|
||||||
glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
|
glog.Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if lastReadyCondition.Status == api.ConditionUnknown &&
|
if lastReadyCondition.Status == api.ConditionUnknown &&
|
||||||
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
|
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
|
||||||
if nc.podEvictor.Add(node.Name) {
|
if nc.evictPods(node.Name) {
|
||||||
glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
|
glog.Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if lastReadyCondition.Status == api.ConditionTrue {
|
if lastReadyCondition.Status == api.ConditionTrue {
|
||||||
wasDeleting := nc.podEvictor.Remove(node.Name)
|
if nc.cancelPodEviction(node.Name) {
|
||||||
wasTerminating := nc.terminationEvictor.Remove(node.Name)
|
glog.Infof("Node %s is ready again, cancelled pod eviction", node.Name)
|
||||||
if wasDeleting || wasTerminating {
|
|
||||||
glog.Infof("Pods on %v won't be evicted", node.Name)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -326,8 +337,8 @@ func (nc *NodeController) monitorNodeStatus() error {
|
|||||||
}
|
}
|
||||||
if remaining {
|
if remaining {
|
||||||
// queue eviction of the pods on the node
|
// queue eviction of the pods on the node
|
||||||
glog.Infof("Deleting node %s is delayed while pods are evicted", node.Name)
|
glog.V(2).Infof("Deleting node %s is delayed while pods are evicted", node.Name)
|
||||||
nc.podEvictor.Add(node.Name)
|
nc.evictPods(node.Name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -529,30 +540,44 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||||||
|
|
||||||
// returns true if the provided node still has pods scheduled to it, or an error if
|
// returns true if the provided node still has pods scheduled to it, or an error if
|
||||||
// the server could not be contacted.
|
// the server could not be contacted.
|
||||||
func (nc *NodeController) hasPods(nodeID string) (bool, error) {
|
func (nc *NodeController) hasPods(nodeName string) (bool, error) {
|
||||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeID))
|
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
return len(pods.Items) > 0, nil
|
return len(pods.Items) > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// evictPods queues an eviction for the provided node name, and returns false if the node is already
|
||||||
|
// queued for eviction.
|
||||||
|
func (nc *NodeController) evictPods(nodeName string) bool {
|
||||||
|
return nc.podEvictor.Add(nodeName)
|
||||||
|
}
|
||||||
|
|
||||||
|
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
|
||||||
|
// returns true if an eviction was queued.
|
||||||
|
func (nc *NodeController) cancelPodEviction(nodeName string) bool {
|
||||||
|
wasDeleting := nc.podEvictor.Remove(nodeName)
|
||||||
|
wasTerminating := nc.terminationEvictor.Remove(nodeName)
|
||||||
|
return wasDeleting || wasTerminating
|
||||||
|
}
|
||||||
|
|
||||||
// deletePods will delete all pods from master running on given node, and return true
|
// deletePods will delete all pods from master running on given node, and return true
|
||||||
// if any pods were deleted.
|
// if any pods were deleted.
|
||||||
func (nc *NodeController) deletePods(nodeID string) (bool, error) {
|
func (nc *NodeController) deletePods(nodeName string) (bool, error) {
|
||||||
remaining := false
|
remaining := false
|
||||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeID))
|
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return remaining, err
|
return remaining, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(pods.Items) > 0 {
|
if len(pods.Items) > 0 {
|
||||||
nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
|
nc.recordNodeEvent(nodeName, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
// Defensive check, also needed for tests.
|
// Defensive check, also needed for tests.
|
||||||
if pod.Spec.NodeName != nodeID {
|
if pod.Spec.NodeName != nodeName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// if the pod has already been deleted, ignore it
|
// if the pod has already been deleted, ignore it
|
||||||
@ -561,7 +586,7 @@ func (nc *NodeController) deletePods(nodeID string) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
glog.V(2).Infof("Delete pod %v", pod.Name)
|
glog.V(2).Infof("Delete pod %v", pod.Name)
|
||||||
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
|
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeName)
|
||||||
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -571,22 +596,25 @@ func (nc *NodeController) deletePods(nodeID string) (bool, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
|
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
|
||||||
// cleaned up
|
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
|
||||||
func (nc *NodeController) terminatePods(nodeID string, since time.Time) (bool, time.Duration, error) {
|
// long before we should check again (the next deadline for a pod to complete), or an error.
|
||||||
remaining := time.Duration(0)
|
func (nc *NodeController) terminatePods(nodeName string, since time.Time) (bool, time.Duration, error) {
|
||||||
|
// the time before we should try again
|
||||||
|
nextAttempt := time.Duration(0)
|
||||||
|
// have we deleted all pods
|
||||||
complete := true
|
complete := true
|
||||||
|
|
||||||
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
|
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
|
||||||
fields.OneTermEqualSelector(client.PodHost, nodeID))
|
fields.OneTermEqualSelector(client.PodHost, nodeName))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, remaining, err
|
return false, nextAttempt, err
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
elapsed := now.Sub(since)
|
elapsed := now.Sub(since)
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
// Defensive check, also needed for tests.
|
// Defensive check, also needed for tests.
|
||||||
if pod.Spec.NodeName != nodeID {
|
if pod.Spec.NodeName != nodeName {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// only clean terminated pods
|
// only clean terminated pods
|
||||||
@ -594,28 +622,30 @@ func (nc *NodeController) terminatePods(nodeID string, since time.Time) (bool, t
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the user's requested grace period
|
||||||
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
|
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
|
||||||
if grace > nc.maximumGracePeriod {
|
if grace > nc.maximumGracePeriod {
|
||||||
grace = nc.maximumGracePeriod
|
grace = nc.maximumGracePeriod
|
||||||
}
|
}
|
||||||
next := grace - elapsed
|
|
||||||
|
|
||||||
if next < 0 {
|
// the time remaining before the pod should have been deleted
|
||||||
next = 0
|
remaining := grace - elapsed
|
||||||
|
if remaining < 0 {
|
||||||
|
remaining = 0
|
||||||
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
|
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
|
||||||
nc.recordNodeEvent(nodeID, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeID))
|
nc.recordNodeEvent(nodeName, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
|
||||||
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
|
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
|
||||||
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
|
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
|
||||||
complete = false
|
complete = false
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
glog.V(2).Infof("Pod %v still terminating with %s remaining", pod.Name, next)
|
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
|
||||||
complete = false
|
complete = false
|
||||||
}
|
}
|
||||||
|
|
||||||
if remaining < next {
|
if nextAttempt < remaining {
|
||||||
remaining = next
|
nextAttempt = remaining
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return complete, remaining, nil
|
return complete, nextAttempt, nil
|
||||||
}
|
}
|
||||||
|
@ -347,7 +347,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||||||
return true, 0
|
return true, 0
|
||||||
})
|
})
|
||||||
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
nodeController.terminatePods(value.Value, value.Added)
|
nodeController.terminatePods(value.Value, value.AddedAt)
|
||||||
return true, 0
|
return true, 0
|
||||||
})
|
})
|
||||||
podEvicted := false
|
podEvicted := false
|
||||||
|
@ -27,18 +27,18 @@ import (
|
|||||||
// TimedValue is a value that should be processed at a designated time.
|
// TimedValue is a value that should be processed at a designated time.
|
||||||
type TimedValue struct {
|
type TimedValue struct {
|
||||||
Value string
|
Value string
|
||||||
Added time.Time
|
AddedAt time.Time
|
||||||
Next time.Time
|
ProcessAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// now is used to test time
|
// now is used to test time
|
||||||
var now func() time.Time = time.Now
|
var now func() time.Time = time.Now
|
||||||
|
|
||||||
// TimedQueue is a priority heap where the lowest Next is at the front of the queue
|
// TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue
|
||||||
type TimedQueue []*TimedValue
|
type TimedQueue []*TimedValue
|
||||||
|
|
||||||
func (h TimedQueue) Len() int { return len(h) }
|
func (h TimedQueue) Len() int { return len(h) }
|
||||||
func (h TimedQueue) Less(i, j int) bool { return h[i].Next.Before(h[j].Next) }
|
func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
|
||||||
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
||||||
|
|
||||||
func (h *TimedQueue) Push(x interface{}) {
|
func (h *TimedQueue) Push(x interface{}) {
|
||||||
@ -75,6 +75,23 @@ func (q *UniqueQueue) Add(value TimedValue) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Replace replaces an existing value in the queue if it already exists, otherwise it does nothing.
|
||||||
|
// Returns true if the item was found.
|
||||||
|
func (q *UniqueQueue) Replace(value TimedValue) bool {
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
|
for i := range q.queue {
|
||||||
|
if q.queue[i].Value != value.Value {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
heap.Remove(&q.queue, i)
|
||||||
|
heap.Push(&q.queue, &value)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
|
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
|
||||||
// of the given value. If the value is not present does nothing and returns false.
|
// of the given value. If the value is not present does nothing and returns false.
|
||||||
func (q *UniqueQueue) Remove(value string) bool {
|
func (q *UniqueQueue) Remove(value string) bool {
|
||||||
@ -103,6 +120,17 @@ func (q *UniqueQueue) Get() (TimedValue, bool) {
|
|||||||
return *result, true
|
return *result, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Head returns the oldest added value that wasn't returned yet without removing it.
|
||||||
|
func (q *UniqueQueue) Head() (TimedValue, bool) {
|
||||||
|
q.lock.Lock()
|
||||||
|
defer q.lock.Unlock()
|
||||||
|
if len(q.queue) == 0 {
|
||||||
|
return TimedValue{}, false
|
||||||
|
}
|
||||||
|
result := q.queue[0]
|
||||||
|
return *result, true
|
||||||
|
}
|
||||||
|
|
||||||
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
|
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
|
||||||
// of execution. It is also rate limited.
|
// of execution. It is also rate limited.
|
||||||
type RateLimitedTimedQueue struct {
|
type RateLimitedTimedQueue struct {
|
||||||
@ -133,7 +161,7 @@ type ActionFunc func(TimedValue) (bool, time.Duration)
|
|||||||
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
|
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
|
||||||
// time to execute the next item in the queue.
|
// time to execute the next item in the queue.
|
||||||
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
|
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
|
||||||
val, ok := q.queue.Get()
|
val, ok := q.queue.Head()
|
||||||
for ok {
|
for ok {
|
||||||
// rate limit the queue checking
|
// rate limit the queue checking
|
||||||
if q.leak {
|
if q.leak {
|
||||||
@ -145,18 +173,20 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
now := now()
|
now := now()
|
||||||
if now.Before(val.Next) {
|
if now.Before(val.ProcessAt) {
|
||||||
q.queue.Add(val)
|
q.queue.Replace(val)
|
||||||
val, ok = q.queue.Get()
|
val, ok = q.queue.Head()
|
||||||
// we do not sleep here because other values may be added at the front of the queue
|
// we do not sleep here because other values may be added at the front of the queue
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok, wait := fn(val); !ok {
|
if ok, wait := fn(val); !ok {
|
||||||
val.Next = now.Add(wait + 1)
|
val.ProcessAt = now.Add(wait + 1)
|
||||||
q.queue.Add(val)
|
q.queue.Replace(val)
|
||||||
|
} else {
|
||||||
|
q.queue.Remove(val.Value)
|
||||||
}
|
}
|
||||||
val, ok = q.queue.Get()
|
val, ok = q.queue.Head()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,8 +196,8 @@ func (q *RateLimitedTimedQueue) Add(value string) bool {
|
|||||||
now := now()
|
now := now()
|
||||||
return q.queue.Add(TimedValue{
|
return q.queue.Add(TimedValue{
|
||||||
Value: value,
|
Value: value,
|
||||||
Added: now,
|
AddedAt: now,
|
||||||
Next: now,
|
ProcessAt: now,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,10 +161,10 @@ func TestTryOrdering(t *testing.T) {
|
|||||||
queued := false
|
queued := false
|
||||||
evictor.Try(func(value TimedValue) (bool, time.Duration) {
|
evictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
count++
|
count++
|
||||||
if value.Added.IsZero() {
|
if value.AddedAt.IsZero() {
|
||||||
t.Fatalf("added should not be zero")
|
t.Fatalf("added should not be zero")
|
||||||
}
|
}
|
||||||
if value.Next.IsZero() {
|
if value.ProcessAt.IsZero() {
|
||||||
t.Fatalf("next should not be zero")
|
t.Fatalf("next should not be zero")
|
||||||
}
|
}
|
||||||
if !queued && value.Value == "second" {
|
if !queued && value.Value == "second" {
|
||||||
@ -181,3 +181,49 @@ func TestTryOrdering(t *testing.T) {
|
|||||||
t.Fatalf("unexpected iterations: %d", count)
|
t.Fatalf("unexpected iterations: %d", count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTryRemovingWhileTry(t *testing.T) {
|
||||||
|
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false)
|
||||||
|
evictor.Add("first")
|
||||||
|
evictor.Add("second")
|
||||||
|
evictor.Add("third")
|
||||||
|
|
||||||
|
processing := make(chan struct{})
|
||||||
|
wait := make(chan struct{})
|
||||||
|
order := []string{}
|
||||||
|
count := 0
|
||||||
|
queued := false
|
||||||
|
|
||||||
|
// while the Try function is processing "second", remove it from the queue
|
||||||
|
// we should not see "second" retried.
|
||||||
|
go func() {
|
||||||
|
<-processing
|
||||||
|
evictor.Remove("second")
|
||||||
|
close(wait)
|
||||||
|
}()
|
||||||
|
|
||||||
|
evictor.Try(func(value TimedValue) (bool, time.Duration) {
|
||||||
|
count++
|
||||||
|
if value.AddedAt.IsZero() {
|
||||||
|
t.Fatalf("added should not be zero")
|
||||||
|
}
|
||||||
|
if value.ProcessAt.IsZero() {
|
||||||
|
t.Fatalf("next should not be zero")
|
||||||
|
}
|
||||||
|
if !queued && value.Value == "second" {
|
||||||
|
queued = true
|
||||||
|
close(processing)
|
||||||
|
<-wait
|
||||||
|
return false, time.Millisecond
|
||||||
|
}
|
||||||
|
order = append(order, value.Value)
|
||||||
|
return true, 0
|
||||||
|
})
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(order, []string{"first", "third"}) {
|
||||||
|
t.Fatalf("order was wrong: %v", order)
|
||||||
|
}
|
||||||
|
if count != 3 {
|
||||||
|
t.Fatalf("unexpected iterations: %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user