Merge pull request #82365 from jkaniuk/pod-gc

Pod GC controller - use node lister
This commit is contained in:
Kubernetes Prow Robot 2019-10-24 03:13:06 -07:00 committed by GitHub
commit 2c4cba8aa0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 328 additions and 116 deletions

View File

@ -346,6 +346,7 @@ func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) {
go podgc.NewPodGC(
ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold),
).Run(ctx.Stop)
return nil, true, nil

View File

@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
@ -19,6 +20,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
@ -34,11 +36,14 @@ go_test(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
],
)

View File

@ -22,6 +22,7 @@ import (
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -31,41 +32,53 @@ import (
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog"
)
const (
// gcCheckPeriod defines frequency of running main controller loop
gcCheckPeriod = 20 * time.Second
// quarantineTime defines how long Orphaned GC waits for nodes to show up
// in an informer before issuing a GET call to check if they are truly gone
quarantineTime = 40 * time.Second
)
type PodGCController struct {
kubeClient clientset.Interface
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
podLister corelisters.PodLister
podListerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
nodeQueue workqueue.DelayingInterface
deletePod func(namespace, name string) error
terminatedPodThreshold int
}
func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer, terminatedPodThreshold int) *PodGCController {
func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int) *PodGCController {
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
}
gcc := &PodGCController{
kubeClient: kubeClient,
terminatedPodThreshold: terminatedPodThreshold,
podLister: podInformer.Lister(),
podListerSynced: podInformer.Informer().HasSynced,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
nodeQueue: workqueue.NewNamedDelayingQueue("orphaned_pods_nodes"),
deletePod: func(namespace, name string) error {
klog.Infof("PodGC is force deleting Pod: %v/%v", namespace, name)
return kubeClient.CoreV1().Pods(namespace).Delete(name, metav1.NewDeleteOptions(0))
},
}
gcc.podLister = podInformer.Lister()
gcc.podListerSynced = podInformer.Informer().HasSynced
return gcc
}
@ -73,9 +86,10 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting GC controller")
defer gcc.nodeQueue.ShutDown()
defer klog.Infof("Shutting down GC controller")
if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced) {
if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced, gcc.nodeListerSynced) {
return
}
@ -87,13 +101,18 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) {
func (gcc *PodGCController) gc() {
pods, err := gcc.podLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error while listing all Pods: %v", err)
klog.Errorf("Error while listing all pods: %v", err)
return
}
nodes, err := gcc.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error while listing all nodes: %v", err)
return
}
if gcc.terminatedPodThreshold > 0 {
gcc.gcTerminated(pods)
}
gcc.gcOrphaned(pods)
gcc.gcOrphaned(pods, nodes)
gcc.gcUnscheduledTerminating(pods)
}
@ -118,12 +137,11 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
if deleteCount > terminatedPodCount {
deleteCount = terminatedPodCount
}
if deleteCount > 0 {
klog.Infof("garbage collecting %v pods", deleteCount)
} else {
if deleteCount <= 0 {
return
}
klog.Infof("garbage collecting %v pods", deleteCount)
// sort only when necessary
sort.Sort(byCreationTimestamp(terminatedPods))
var wait sync.WaitGroup
@ -141,23 +159,26 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
}
// gcOrphaned deletes pods that are bound to nodes that don't exist.
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod, nodes []*v1.Node) {
klog.V(4).Infof("GC'ing orphaned")
// We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible.
nodes, err := gcc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
existingNodeNames := sets.NewString()
for _, node := range nodes {
existingNodeNames.Insert(node.Name)
}
// Add newly found unknown nodes to quarantine
for _, pod := range pods {
if pod.Spec.NodeName != "" && !existingNodeNames.Has(pod.Spec.NodeName) {
gcc.nodeQueue.AddAfter(pod.Spec.NodeName, quarantineTime)
}
}
// Check if nodes are still missing after quarantine period
deletedNodesNames, quit := gcc.discoverDeletedNodes(existingNodeNames)
if quit {
return
}
nodeNames := sets.NewString()
for i := range nodes.Items {
nodeNames.Insert(nodes.Items[i].Name)
}
// Delete orphaned pods
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if nodeNames.Has(pod.Spec.NodeName) {
if !deletedNodesNames.Has(pod.Spec.NodeName) {
continue
}
klog.V(2).Infof("Found orphaned Pod %v/%v assigned to the Node %v. Deleting.", pod.Namespace, pod.Name, pod.Spec.NodeName)
@ -169,6 +190,37 @@ func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) {
}
}
func (gcc *PodGCController) discoverDeletedNodes(existingNodeNames sets.String) (sets.String, bool) {
deletedNodesNames := sets.NewString()
for gcc.nodeQueue.Len() > 0 {
item, quit := gcc.nodeQueue.Get()
if quit {
return nil, true
}
nodeName := item.(string)
if !existingNodeNames.Has(nodeName) {
exists, err := gcc.checkIfNodeExists(nodeName)
switch {
case err != nil:
klog.Errorf("Error while getting node %q: %v", nodeName, err)
// Node will be added back to the queue in the subsequent loop if still needed
case !exists:
deletedNodesNames.Insert(nodeName)
}
}
gcc.nodeQueue.Done(item)
}
return deletedNodesNames, false
}
func (gcc *PodGCController) checkIfNodeExists(name string) (bool, error) {
_, fetchErr := gcc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if errors.IsNotFound(fetchErr) {
return false, nil
}
return fetchErr == nil, fetchErr
}
// gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node.
func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) {
klog.V(4).Infof("GC'ing unscheduled pods which are terminating.")

View File

@ -21,14 +21,17 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/testutil"
)
@ -47,12 +50,25 @@ func (*FakeController) LastSyncResourceVersion() string {
func alwaysReady() bool { return true }
func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer) {
func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer, coreinformers.NodeInformer) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
podInformer := informerFactory.Core().V1().Pods()
controller := NewPodGC(kubeClient, podInformer, terminatedPodThreshold)
nodeInformer := informerFactory.Core().V1().Nodes()
controller := NewPodGC(kubeClient, podInformer, nodeInformer, terminatedPodThreshold)
controller.podListerSynced = alwaysReady
return controller, podInformer
return controller, podInformer, nodeInformer
}
func compareStringSetToList(set sets.String, list []string) bool {
for _, item := range list {
if !set.Has(item) {
return false
}
}
if len(list) != len(set) {
return false
}
return true
}
func TestGCTerminated(t *testing.T) {
@ -113,7 +129,7 @@ func TestGCTerminated(t *testing.T) {
for i, test := range testCases {
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}})
gcc, podInformer := NewFromClient(client, test.threshold)
gcc, podInformer, _ := NewFromClient(client, test.threshold)
deletedPodNames := make([]string, 0)
var lock sync.Mutex
gcc.deletePod = func(_, name string) error {
@ -135,90 +151,232 @@ func TestGCTerminated(t *testing.T) {
gcc.gc()
pass := true
for _, pod := range deletedPodNames {
if !test.deletedPodNames.Has(pod) {
pass = false
}
}
if len(deletedPodNames) != len(test.deletedPodNames) {
pass = false
}
if !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames)
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
i, test.deletedPodNames.List(), deletedPodNames)
}
}
}
func TestGCOrphaned(t *testing.T) {
type nameToPhase struct {
name string
phase v1.PodPhase
func makePod(name string, nodeName string, phase v1.PodPhase) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: v1.PodSpec{NodeName: nodeName},
Status: v1.PodStatus{Phase: phase},
}
}
func waitForAdded(q workqueue.DelayingInterface, depth int) error {
return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
if q.Len() == depth {
return true, nil
}
return false, nil
})
}
func TestGCOrphaned(t *testing.T) {
testCases := []struct {
pods []nameToPhase
threshold int
deletedPodNames sets.String
name string
initialClientNodes []*v1.Node
initialInformerNodes []*v1.Node
delay time.Duration
addedClientNodes []*v1.Node
deletedClientNodes []*v1.Node
addedInformerNodes []*v1.Node
deletedInformerNodes []*v1.Node
pods []*v1.Pod
itemsInQueue int
deletedPodNames sets.String
}{
{
pods: []nameToPhase{
{name: "a", phase: v1.PodFailed},
{name: "b", phase: v1.PodSucceeded},
name: "nodes present in lister",
initialInformerNodes: []*v1.Node{
testutil.NewNode("existing1"),
testutil.NewNode("existing2"),
},
threshold: 0,
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "existing1", v1.PodRunning),
makePod("b", "existing2", v1.PodFailed),
makePod("c", "existing2", v1.PodSucceeded),
},
itemsInQueue: 0,
deletedPodNames: sets.NewString(),
},
{
name: "nodes present in client",
initialClientNodes: []*v1.Node{
testutil.NewNode("existing1"),
testutil.NewNode("existing2"),
},
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "existing1", v1.PodRunning),
makePod("b", "existing2", v1.PodFailed),
makePod("c", "existing2", v1.PodSucceeded),
},
itemsInQueue: 2,
deletedPodNames: sets.NewString(),
},
{
name: "no nodes",
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodFailed),
makePod("b", "deleted", v1.PodSucceeded),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a", "b"),
},
{
pods: []nameToPhase{
{name: "a", phase: v1.PodRunning},
name: "quarantine not finished",
delay: quarantineTime / 2,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodFailed),
},
threshold: 1,
itemsInQueue: 0,
deletedPodNames: sets.NewString(),
},
{
name: "wrong nodes",
initialInformerNodes: []*v1.Node{testutil.NewNode("existing")},
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodRunning),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a"),
},
{
name: "some nodes missing",
initialInformerNodes: []*v1.Node{testutil.NewNode("existing")},
delay: 2 * quarantineTime,
pods: []*v1.Pod{
makePod("a", "deleted", v1.PodFailed),
makePod("b", "existing", v1.PodFailed),
makePod("c", "deleted", v1.PodSucceeded),
makePod("d", "deleted", v1.PodRunning),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a", "c", "d"),
},
{
name: "node added to client after quarantine",
delay: 2 * quarantineTime,
addedClientNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodRunning),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString(),
},
{
name: "node added to informer after quarantine",
delay: 2 * quarantineTime,
addedInformerNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodFailed),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString(),
},
{
// It shouldn't happen that client will be lagging behind informer.
// This test case is more a sanity check.
name: "node deleted from client after quarantine",
initialClientNodes: []*v1.Node{testutil.NewNode("node")},
delay: 2 * quarantineTime,
deletedClientNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodFailed),
},
itemsInQueue: 1,
deletedPodNames: sets.NewString("a"),
},
{
name: "node deleted from informer after quarantine",
initialInformerNodes: []*v1.Node{testutil.NewNode("node")},
delay: 2 * quarantineTime,
deletedInformerNodes: []*v1.Node{testutil.NewNode("node")},
pods: []*v1.Pod{
makePod("a", "node", v1.PodSucceeded),
},
itemsInQueue: 0,
deletedPodNames: sets.NewString(),
},
}
for i, test := range testCases {
client := fake.NewSimpleClientset()
gcc, podInformer := NewFromClient(client, test.threshold)
deletedPodNames := make([]string, 0)
var lock sync.Mutex
gcc.deletePod = func(_, name string) error {
lock.Lock()
defer lock.Unlock()
deletedPodNames = append(deletedPodNames, name)
return nil
}
creationTime := time.Unix(0, 0)
for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
podInformer.Informer().GetStore().Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}},
Status: v1.PodStatus{Phase: pod.phase},
Spec: v1.PodSpec{NodeName: "node"},
})
}
pods, err := podInformer.Lister().List(labels.Everything())
if err != nil {
t.Errorf("Error while listing all Pods: %v", err)
return
}
gcc.gcOrphaned(pods)
pass := true
for _, pod := range deletedPodNames {
if !test.deletedPodNames.Has(pod) {
pass = false
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
nodeList := &v1.NodeList{}
for _, node := range test.initialClientNodes {
nodeList.Items = append(nodeList.Items, *node)
}
}
if len(deletedPodNames) != len(test.deletedPodNames) {
pass = false
}
if !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames)
}
client := fake.NewSimpleClientset(nodeList)
gcc, podInformer, nodeInformer := NewFromClient(client, -1)
for _, node := range test.initialInformerNodes {
nodeInformer.Informer().GetStore().Add(node)
}
for _, pod := range test.pods {
podInformer.Informer().GetStore().Add(pod)
}
// Overwrite queue
fakeClock := clock.NewFakeClock(time.Now())
gcc.nodeQueue.ShutDown()
gcc.nodeQueue = workqueue.NewDelayingQueueWithCustomClock(fakeClock, "podgc_test_queue")
deletedPodNames := make([]string, 0)
var lock sync.Mutex
gcc.deletePod = func(_, name string) error {
lock.Lock()
defer lock.Unlock()
deletedPodNames = append(deletedPodNames, name)
return nil
}
// First GC of orphaned pods
gcc.gc()
if len(deletedPodNames) > 0 {
t.Errorf("no pods should be deleted at this point.\n\tactual: %v", deletedPodNames)
}
// Move clock forward
fakeClock.Step(test.delay)
// Wait for queue goroutine to process items
if test.itemsInQueue > 0 {
err := waitForAdded(gcc.nodeQueue, test.itemsInQueue)
if err != nil {
t.Errorf("wrong number of items in the node queue.\n\texpected: %v\n\tactual: %v",
test.itemsInQueue, gcc.nodeQueue.Len())
}
}
// Execute planned nodes changes
for _, node := range test.addedClientNodes {
client.CoreV1().Nodes().Create(node)
}
for _, node := range test.deletedClientNodes {
client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{})
}
for _, node := range test.addedInformerNodes {
nodeInformer.Informer().GetStore().Add(node)
}
for _, node := range test.deletedInformerNodes {
nodeInformer.Informer().GetStore().Delete(node)
}
// Actual pod deletion
gcc.gc()
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
test.deletedPodNames.List(), deletedPodNames)
}
})
}
}
@ -257,7 +415,7 @@ func TestGCUnscheduledTerminating(t *testing.T) {
for i, test := range testCases {
client := fake.NewSimpleClientset()
gcc, podInformer := NewFromClient(client, -1)
gcc, podInformer, _ := NewFromClient(client, -1)
deletedPodNames := make([]string, 0)
var lock sync.Mutex
gcc.deletePod = func(_, name string) error {
@ -285,17 +443,9 @@ func TestGCUnscheduledTerminating(t *testing.T) {
}
gcc.gcUnscheduledTerminating(pods)
pass := true
for _, pod := range deletedPodNames {
if !test.deletedPodNames.Has(pod) {
pass = false
}
}
if len(deletedPodNames) != len(test.deletedPodNames) {
pass = false
}
if !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v", i, test.deletedPodNames, deletedPodNames, test.name)
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v",
i, test.deletedPodNames.List(), deletedPodNames, test.name)
}
}
}

View File

@ -255,7 +255,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "pod-garbage-collector"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(),
rbacv1helpers.NewRule("list").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(),
},
})
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{

View File

@ -872,6 +872,7 @@ items:
resources:
- nodes
verbs:
- get
- list
- apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole

View File

@ -35,14 +35,17 @@ type DelayingInterface interface {
// NewDelayingQueue constructs a new workqueue with delayed queuing ability
func NewDelayingQueue() DelayingInterface {
return newDelayingQueue(clock.RealClock{}, "")
return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
}
// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
func NewNamedDelayingQueue(name string) DelayingInterface {
return newDelayingQueue(clock.RealClock{}, name)
return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
}
func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
// NewDelayingQueueWithCustomClock constructs a new named workqueue
// with ability to inject real or fake clock for testing purposes
func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
ret := &delayingType{
Interface: NewNamed(name),
clock: clock,

View File

@ -29,7 +29,7 @@ import (
func TestSimpleQueue(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
q := NewDelayingQueueWithCustomClock(fakeClock, "")
first := "foo"
@ -71,7 +71,7 @@ func TestSimpleQueue(t *testing.T) {
func TestDeduping(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
q := NewDelayingQueueWithCustomClock(fakeClock, "")
first := "foo"
@ -130,7 +130,7 @@ func TestDeduping(t *testing.T) {
func TestAddTwoFireEarly(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
q := NewDelayingQueueWithCustomClock(fakeClock, "")
first := "foo"
second := "bar"
@ -179,7 +179,7 @@ func TestAddTwoFireEarly(t *testing.T) {
func TestCopyShifting(t *testing.T) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
q := NewDelayingQueueWithCustomClock(fakeClock, "")
first := "foo"
second := "bar"
@ -217,7 +217,7 @@ func TestCopyShifting(t *testing.T) {
func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
fakeClock := clock.NewFakeClock(time.Now())
q := newDelayingQueue(fakeClock, "")
q := NewDelayingQueueWithCustomClock(fakeClock, "")
// Add items
for n := 0; n < b.N; n++ {