diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 07daa9251e9..9689fa40431 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -346,6 +346,7 @@ func startPodGCController(ctx ControllerContext) (http.Handler, bool, error) { go podgc.NewPodGC( ctx.ClientBuilder.ClientOrDie("pod-garbage-collector"), ctx.InformerFactory.Core().V1().Pods(), + ctx.InformerFactory.Core().V1().Nodes(), int(ctx.ComponentConfig.PodGCController.TerminatedPodGCThreshold), ).Run(ctx.Stop) return nil, true, nil diff --git a/pkg/controller/podgc/BUILD b/pkg/controller/podgc/BUILD index 2a10e7eb2f2..12b277432e8 100644 --- a/pkg/controller/podgc/BUILD +++ b/pkg/controller/podgc/BUILD @@ -10,6 +10,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", @@ -19,6 +20,7 @@ go_library( "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -34,11 +36,14 @@ go_test( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index 02820611b26..ab1a060f242 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -22,6 +22,7 @@ import ( "time" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -31,41 +32,53 @@ import ( clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog" ) const ( + // gcCheckPeriod defines frequency of running main controller loop gcCheckPeriod = 20 * time.Second + // quarantineTime defines how long Orphaned GC waits for nodes to show up + // in an informer before issuing a GET call to check if they are truly gone + quarantineTime = 40 * time.Second ) type PodGCController struct { kubeClient clientset.Interface - podLister corelisters.PodLister - podListerSynced cache.InformerSynced + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + + nodeQueue workqueue.DelayingInterface deletePod func(namespace, name string) error terminatedPodThreshold int } -func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer, terminatedPodThreshold int) *PodGCController { +func NewPodGC(kubeClient clientset.Interface, podInformer coreinformers.PodInformer, + nodeInformer coreinformers.NodeInformer, terminatedPodThreshold int) *PodGCController { if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { ratelimiter.RegisterMetricAndTrackRateLimiterUsage("gc_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()) } gcc := &PodGCController{ kubeClient: kubeClient, terminatedPodThreshold: terminatedPodThreshold, + podLister: podInformer.Lister(), + podListerSynced: podInformer.Informer().HasSynced, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + nodeQueue: workqueue.NewNamedDelayingQueue("orphaned_pods_nodes"), deletePod: func(namespace, name string) error { klog.Infof("PodGC is force deleting Pod: %v/%v", namespace, name) return kubeClient.CoreV1().Pods(namespace).Delete(name, metav1.NewDeleteOptions(0)) }, } - gcc.podLister = podInformer.Lister() - gcc.podListerSynced = podInformer.Informer().HasSynced - return gcc } @@ -73,9 +86,10 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) { defer utilruntime.HandleCrash() klog.Infof("Starting GC controller") + defer gcc.nodeQueue.ShutDown() defer klog.Infof("Shutting down GC controller") - if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced) { + if !cache.WaitForNamedCacheSync("GC", stop, gcc.podListerSynced, gcc.nodeListerSynced) { return } @@ -87,13 +101,18 @@ func (gcc *PodGCController) Run(stop <-chan struct{}) { func (gcc *PodGCController) gc() { pods, err := gcc.podLister.List(labels.Everything()) if err != nil { - klog.Errorf("Error while listing all Pods: %v", err) + klog.Errorf("Error while listing all pods: %v", err) + return + } + nodes, err := gcc.nodeLister.List(labels.Everything()) + if err != nil { + klog.Errorf("Error while listing all nodes: %v", err) return } if gcc.terminatedPodThreshold > 0 { gcc.gcTerminated(pods) } - gcc.gcOrphaned(pods) + gcc.gcOrphaned(pods, nodes) gcc.gcUnscheduledTerminating(pods) } @@ -118,12 +137,11 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) { if deleteCount > terminatedPodCount { deleteCount = terminatedPodCount } - if deleteCount > 0 { - klog.Infof("garbage collecting %v pods", deleteCount) - } else { + if deleteCount <= 0 { return } + klog.Infof("garbage collecting %v pods", deleteCount) // sort only when necessary sort.Sort(byCreationTimestamp(terminatedPods)) var wait sync.WaitGroup @@ -141,23 +159,26 @@ func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) { } // gcOrphaned deletes pods that are bound to nodes that don't exist. -func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) { +func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod, nodes []*v1.Node) { klog.V(4).Infof("GC'ing orphaned") - // We want to get list of Nodes from the etcd, to make sure that it's as fresh as possible. - nodes, err := gcc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{}) - if err != nil { + existingNodeNames := sets.NewString() + for _, node := range nodes { + existingNodeNames.Insert(node.Name) + } + // Add newly found unknown nodes to quarantine + for _, pod := range pods { + if pod.Spec.NodeName != "" && !existingNodeNames.Has(pod.Spec.NodeName) { + gcc.nodeQueue.AddAfter(pod.Spec.NodeName, quarantineTime) + } + } + // Check if nodes are still missing after quarantine period + deletedNodesNames, quit := gcc.discoverDeletedNodes(existingNodeNames) + if quit { return } - nodeNames := sets.NewString() - for i := range nodes.Items { - nodeNames.Insert(nodes.Items[i].Name) - } - + // Delete orphaned pods for _, pod := range pods { - if pod.Spec.NodeName == "" { - continue - } - if nodeNames.Has(pod.Spec.NodeName) { + if !deletedNodesNames.Has(pod.Spec.NodeName) { continue } klog.V(2).Infof("Found orphaned Pod %v/%v assigned to the Node %v. Deleting.", pod.Namespace, pod.Name, pod.Spec.NodeName) @@ -169,6 +190,37 @@ func (gcc *PodGCController) gcOrphaned(pods []*v1.Pod) { } } +func (gcc *PodGCController) discoverDeletedNodes(existingNodeNames sets.String) (sets.String, bool) { + deletedNodesNames := sets.NewString() + for gcc.nodeQueue.Len() > 0 { + item, quit := gcc.nodeQueue.Get() + if quit { + return nil, true + } + nodeName := item.(string) + if !existingNodeNames.Has(nodeName) { + exists, err := gcc.checkIfNodeExists(nodeName) + switch { + case err != nil: + klog.Errorf("Error while getting node %q: %v", nodeName, err) + // Node will be added back to the queue in the subsequent loop if still needed + case !exists: + deletedNodesNames.Insert(nodeName) + } + } + gcc.nodeQueue.Done(item) + } + return deletedNodesNames, false +} + +func (gcc *PodGCController) checkIfNodeExists(name string) (bool, error) { + _, fetchErr := gcc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{}) + if errors.IsNotFound(fetchErr) { + return false, nil + } + return fetchErr == nil, fetchErr +} + // gcUnscheduledTerminating deletes pods that are terminating and haven't been scheduled to a particular node. func (gcc *PodGCController) gcUnscheduledTerminating(pods []*v1.Pod) { klog.V(4).Infof("GC'ing unscheduled pods which are terminating.") diff --git a/pkg/controller/podgc/gc_controller_test.go b/pkg/controller/podgc/gc_controller_test.go index 73ecd7a8ccf..674582c4fdb 100644 --- a/pkg/controller/podgc/gc_controller_test.go +++ b/pkg/controller/podgc/gc_controller_test.go @@ -21,14 +21,17 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/testutil" ) @@ -47,12 +50,25 @@ func (*FakeController) LastSyncResourceVersion() string { func alwaysReady() bool { return true } -func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer) { +func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) (*PodGCController, coreinformers.PodInformer, coreinformers.NodeInformer) { informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc()) podInformer := informerFactory.Core().V1().Pods() - controller := NewPodGC(kubeClient, podInformer, terminatedPodThreshold) + nodeInformer := informerFactory.Core().V1().Nodes() + controller := NewPodGC(kubeClient, podInformer, nodeInformer, terminatedPodThreshold) controller.podListerSynced = alwaysReady - return controller, podInformer + return controller, podInformer, nodeInformer +} + +func compareStringSetToList(set sets.String, list []string) bool { + for _, item := range list { + if !set.Has(item) { + return false + } + } + if len(list) != len(set) { + return false + } + return true } func TestGCTerminated(t *testing.T) { @@ -113,7 +129,7 @@ func TestGCTerminated(t *testing.T) { for i, test := range testCases { client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}}) - gcc, podInformer := NewFromClient(client, test.threshold) + gcc, podInformer, _ := NewFromClient(client, test.threshold) deletedPodNames := make([]string, 0) var lock sync.Mutex gcc.deletePod = func(_, name string) error { @@ -135,90 +151,232 @@ func TestGCTerminated(t *testing.T) { gcc.gc() - pass := true - for _, pod := range deletedPodNames { - if !test.deletedPodNames.Has(pod) { - pass = false - } - } - if len(deletedPodNames) != len(test.deletedPodNames) { - pass = false - } - if !pass { - t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames) + if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { + t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", + i, test.deletedPodNames.List(), deletedPodNames) } } } -func TestGCOrphaned(t *testing.T) { - type nameToPhase struct { - name string - phase v1.PodPhase +func makePod(name string, nodeName string, phase v1.PodPhase) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.PodSpec{NodeName: nodeName}, + Status: v1.PodStatus{Phase: phase}, } +} +func waitForAdded(q workqueue.DelayingInterface, depth int) error { + return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) { + if q.Len() == depth { + return true, nil + } + + return false, nil + }) +} + +func TestGCOrphaned(t *testing.T) { testCases := []struct { - pods []nameToPhase - threshold int - deletedPodNames sets.String + name string + initialClientNodes []*v1.Node + initialInformerNodes []*v1.Node + delay time.Duration + addedClientNodes []*v1.Node + deletedClientNodes []*v1.Node + addedInformerNodes []*v1.Node + deletedInformerNodes []*v1.Node + pods []*v1.Pod + itemsInQueue int + deletedPodNames sets.String }{ { - pods: []nameToPhase{ - {name: "a", phase: v1.PodFailed}, - {name: "b", phase: v1.PodSucceeded}, + name: "nodes present in lister", + initialInformerNodes: []*v1.Node{ + testutil.NewNode("existing1"), + testutil.NewNode("existing2"), }, - threshold: 0, + delay: 2 * quarantineTime, + pods: []*v1.Pod{ + makePod("a", "existing1", v1.PodRunning), + makePod("b", "existing2", v1.PodFailed), + makePod("c", "existing2", v1.PodSucceeded), + }, + itemsInQueue: 0, + deletedPodNames: sets.NewString(), + }, + { + name: "nodes present in client", + initialClientNodes: []*v1.Node{ + testutil.NewNode("existing1"), + testutil.NewNode("existing2"), + }, + delay: 2 * quarantineTime, + pods: []*v1.Pod{ + makePod("a", "existing1", v1.PodRunning), + makePod("b", "existing2", v1.PodFailed), + makePod("c", "existing2", v1.PodSucceeded), + }, + itemsInQueue: 2, + deletedPodNames: sets.NewString(), + }, + { + name: "no nodes", + delay: 2 * quarantineTime, + pods: []*v1.Pod{ + makePod("a", "deleted", v1.PodFailed), + makePod("b", "deleted", v1.PodSucceeded), + }, + itemsInQueue: 1, deletedPodNames: sets.NewString("a", "b"), }, { - pods: []nameToPhase{ - {name: "a", phase: v1.PodRunning}, + name: "quarantine not finished", + delay: quarantineTime / 2, + pods: []*v1.Pod{ + makePod("a", "deleted", v1.PodFailed), }, - threshold: 1, + itemsInQueue: 0, + deletedPodNames: sets.NewString(), + }, + { + name: "wrong nodes", + initialInformerNodes: []*v1.Node{testutil.NewNode("existing")}, + delay: 2 * quarantineTime, + pods: []*v1.Pod{ + makePod("a", "deleted", v1.PodRunning), + }, + itemsInQueue: 1, deletedPodNames: sets.NewString("a"), }, + { + name: "some nodes missing", + initialInformerNodes: []*v1.Node{testutil.NewNode("existing")}, + delay: 2 * quarantineTime, + pods: []*v1.Pod{ + makePod("a", "deleted", v1.PodFailed), + makePod("b", "existing", v1.PodFailed), + makePod("c", "deleted", v1.PodSucceeded), + makePod("d", "deleted", v1.PodRunning), + }, + itemsInQueue: 1, + deletedPodNames: sets.NewString("a", "c", "d"), + }, + { + name: "node added to client after quarantine", + delay: 2 * quarantineTime, + addedClientNodes: []*v1.Node{testutil.NewNode("node")}, + pods: []*v1.Pod{ + makePod("a", "node", v1.PodRunning), + }, + itemsInQueue: 1, + deletedPodNames: sets.NewString(), + }, + { + name: "node added to informer after quarantine", + delay: 2 * quarantineTime, + addedInformerNodes: []*v1.Node{testutil.NewNode("node")}, + pods: []*v1.Pod{ + makePod("a", "node", v1.PodFailed), + }, + itemsInQueue: 1, + deletedPodNames: sets.NewString(), + }, + { + // It shouldn't happen that client will be lagging behind informer. + // This test case is more a sanity check. + name: "node deleted from client after quarantine", + initialClientNodes: []*v1.Node{testutil.NewNode("node")}, + delay: 2 * quarantineTime, + deletedClientNodes: []*v1.Node{testutil.NewNode("node")}, + pods: []*v1.Pod{ + makePod("a", "node", v1.PodFailed), + }, + itemsInQueue: 1, + deletedPodNames: sets.NewString("a"), + }, + { + name: "node deleted from informer after quarantine", + initialInformerNodes: []*v1.Node{testutil.NewNode("node")}, + delay: 2 * quarantineTime, + deletedInformerNodes: []*v1.Node{testutil.NewNode("node")}, + pods: []*v1.Pod{ + makePod("a", "node", v1.PodSucceeded), + }, + itemsInQueue: 0, + deletedPodNames: sets.NewString(), + }, } - for i, test := range testCases { - client := fake.NewSimpleClientset() - gcc, podInformer := NewFromClient(client, test.threshold) - deletedPodNames := make([]string, 0) - var lock sync.Mutex - gcc.deletePod = func(_, name string) error { - lock.Lock() - defer lock.Unlock() - deletedPodNames = append(deletedPodNames, name) - return nil - } - - creationTime := time.Unix(0, 0) - for _, pod := range test.pods { - creationTime = creationTime.Add(1 * time.Hour) - podInformer.Informer().GetStore().Add(&v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}}, - Status: v1.PodStatus{Phase: pod.phase}, - Spec: v1.PodSpec{NodeName: "node"}, - }) - } - - pods, err := podInformer.Lister().List(labels.Everything()) - if err != nil { - t.Errorf("Error while listing all Pods: %v", err) - return - } - gcc.gcOrphaned(pods) - - pass := true - for _, pod := range deletedPodNames { - if !test.deletedPodNames.Has(pod) { - pass = false + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + nodeList := &v1.NodeList{} + for _, node := range test.initialClientNodes { + nodeList.Items = append(nodeList.Items, *node) } - } - if len(deletedPodNames) != len(test.deletedPodNames) { - pass = false - } - if !pass { - t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames, deletedPodNames) - } + client := fake.NewSimpleClientset(nodeList) + gcc, podInformer, nodeInformer := NewFromClient(client, -1) + for _, node := range test.initialInformerNodes { + nodeInformer.Informer().GetStore().Add(node) + } + for _, pod := range test.pods { + podInformer.Informer().GetStore().Add(pod) + } + // Overwrite queue + fakeClock := clock.NewFakeClock(time.Now()) + gcc.nodeQueue.ShutDown() + gcc.nodeQueue = workqueue.NewDelayingQueueWithCustomClock(fakeClock, "podgc_test_queue") + + deletedPodNames := make([]string, 0) + var lock sync.Mutex + gcc.deletePod = func(_, name string) error { + lock.Lock() + defer lock.Unlock() + deletedPodNames = append(deletedPodNames, name) + return nil + } + + // First GC of orphaned pods + gcc.gc() + if len(deletedPodNames) > 0 { + t.Errorf("no pods should be deleted at this point.\n\tactual: %v", deletedPodNames) + } + + // Move clock forward + fakeClock.Step(test.delay) + // Wait for queue goroutine to process items + if test.itemsInQueue > 0 { + err := waitForAdded(gcc.nodeQueue, test.itemsInQueue) + if err != nil { + t.Errorf("wrong number of items in the node queue.\n\texpected: %v\n\tactual: %v", + test.itemsInQueue, gcc.nodeQueue.Len()) + } + } + + // Execute planned nodes changes + for _, node := range test.addedClientNodes { + client.CoreV1().Nodes().Create(node) + } + for _, node := range test.deletedClientNodes { + client.CoreV1().Nodes().Delete(node.Name, &metav1.DeleteOptions{}) + } + for _, node := range test.addedInformerNodes { + nodeInformer.Informer().GetStore().Add(node) + } + for _, node := range test.deletedInformerNodes { + nodeInformer.Informer().GetStore().Delete(node) + } + + // Actual pod deletion + gcc.gc() + + if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { + t.Errorf("pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", + test.deletedPodNames.List(), deletedPodNames) + } + }) } } @@ -257,7 +415,7 @@ func TestGCUnscheduledTerminating(t *testing.T) { for i, test := range testCases { client := fake.NewSimpleClientset() - gcc, podInformer := NewFromClient(client, -1) + gcc, podInformer, _ := NewFromClient(client, -1) deletedPodNames := make([]string, 0) var lock sync.Mutex gcc.deletePod = func(_, name string) error { @@ -285,17 +443,9 @@ func TestGCUnscheduledTerminating(t *testing.T) { } gcc.gcUnscheduledTerminating(pods) - pass := true - for _, pod := range deletedPodNames { - if !test.deletedPodNames.Has(pod) { - pass = false - } - } - if len(deletedPodNames) != len(test.deletedPodNames) { - pass = false - } - if !pass { - t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v", i, test.deletedPodNames, deletedPodNames, test.name) + if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { + t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v", + i, test.deletedPodNames.List(), deletedPodNames, test.name) } } } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index 08d57be51a1..8ae39744b6e 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -255,7 +255,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "pod-garbage-collector"}, Rules: []rbacv1.PolicyRule{ rbacv1helpers.NewRule("list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), - rbacv1helpers.NewRule("list").Groups(legacyGroup).Resources("nodes").RuleOrDie(), + rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(), }, }) addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml index 7a76381fc61..969d81fe10b 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/controller-roles.yaml @@ -872,6 +872,7 @@ items: resources: - nodes verbs: + - get - list - apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go index 3868bcac4d2..e1ab76ea218 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue.go @@ -35,14 +35,17 @@ type DelayingInterface interface { // NewDelayingQueue constructs a new workqueue with delayed queuing ability func NewDelayingQueue() DelayingInterface { - return newDelayingQueue(clock.RealClock{}, "") + return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") } +// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability func NewNamedDelayingQueue(name string) DelayingInterface { - return newDelayingQueue(clock.RealClock{}, name) + return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) } -func newDelayingQueue(clock clock.Clock, name string) DelayingInterface { +// NewDelayingQueueWithCustomClock constructs a new named workqueue +// with ability to inject real or fake clock for testing purposes +func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface { ret := &delayingType{ Interface: NewNamed(name), clock: clock, diff --git a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go index 4e427c6b081..2af8ba3a6a1 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go +++ b/staging/src/k8s.io/client-go/util/workqueue/delaying_queue_test.go @@ -29,7 +29,7 @@ import ( func TestSimpleQueue(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock, "") + q := NewDelayingQueueWithCustomClock(fakeClock, "") first := "foo" @@ -71,7 +71,7 @@ func TestSimpleQueue(t *testing.T) { func TestDeduping(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock, "") + q := NewDelayingQueueWithCustomClock(fakeClock, "") first := "foo" @@ -130,7 +130,7 @@ func TestDeduping(t *testing.T) { func TestAddTwoFireEarly(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock, "") + q := NewDelayingQueueWithCustomClock(fakeClock, "") first := "foo" second := "bar" @@ -179,7 +179,7 @@ func TestAddTwoFireEarly(t *testing.T) { func TestCopyShifting(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock, "") + q := NewDelayingQueueWithCustomClock(fakeClock, "") first := "foo" second := "bar" @@ -217,7 +217,7 @@ func TestCopyShifting(t *testing.T) { func BenchmarkDelayingQueue_AddAfter(b *testing.B) { fakeClock := clock.NewFakeClock(time.Now()) - q := newDelayingQueue(fakeClock, "") + q := NewDelayingQueueWithCustomClock(fakeClock, "") // Add items for n := 0; n < b.N; n++ {