From 758c25de2f116a37369b97458200a7afd3a9282d Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Mon, 11 May 2020 19:42:43 -0400 Subject: [PATCH] service controller: store feature gate in local fields for better testability Signed-off-by: Andrew Sy Kim --- cmd/cloud-controller-manager/app/core.go | 1 + cmd/kube-controller-manager/app/core.go | 1 + pkg/controller/service/BUILD | 8 ++-- pkg/controller/service/controller.go | 39 ++++++++++-------- pkg/controller/service/controller_test.go | 48 +++++++++++++++++------ 5 files changed, 67 insertions(+), 30 deletions(-) diff --git a/cmd/cloud-controller-manager/app/core.go b/cmd/cloud-controller-manager/app/core.go index 449e59c646a..cf016138682 100644 --- a/cmd/cloud-controller-manager/app/core.go +++ b/cmd/cloud-controller-manager/app/core.go @@ -89,6 +89,7 @@ func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cl ctx.SharedInformers.Core().V1().Services(), ctx.SharedInformers.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, + utilfeature.DefaultFeatureGate, ) if err != nil { // This error shouldn't fail. It lives like this as a legacy. diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 5f518e4c45f..a76063929b6 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -83,6 +83,7 @@ func startServiceController(ctx ControllerContext) (http.Handler, bool, error) { ctx.InformerFactory.Core().V1().Services(), ctx.InformerFactory.Core().V1().Nodes(), ctx.ComponentConfig.KubeCloudShared.ClusterName, + utilfeature.DefaultFeatureGate, ) if err != nil { // This error shouldn't fail. It lives like this as a legacy. diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index ff870c6bf8d..416212089ae 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -15,7 +15,6 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", @@ -26,6 +25,7 @@ go_library( "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", + "//staging/src/k8s.io/component-base/featuregate:go_default_library", "//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -43,14 +43,16 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/cloud-provider/fake:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", - "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/pkg/controller/service/controller.go b/pkg/controller/service/controller.go index 1348596c178..900fe4c24d2 100644 --- a/pkg/controller/service/controller.go +++ b/pkg/controller/service/controller.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -40,6 +39,7 @@ import ( "k8s.io/client-go/util/workqueue" cloudprovider "k8s.io/cloud-provider" servicehelper "k8s.io/cloud-provider/service/helpers" + "k8s.io/component-base/featuregate" "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog" ) @@ -110,6 +110,9 @@ type Controller struct { nodeListerSynced cache.InformerSynced // services that need to be synced queue workqueue.RateLimitingInterface + // feature gates stored in local field for better testability + legacyNodeRoleFeatureEnabled bool + serviceNodeExclusionFeatureEnabled bool } // New returns a new service controller to keep cloud provider service resources @@ -120,6 +123,7 @@ func New( serviceInformer coreinformers.ServiceInformer, nodeInformer coreinformers.NodeInformer, clusterName string, + featureGate featuregate.FeatureGate, ) (*Controller, error) { broadcaster := record.NewBroadcaster() broadcaster.StartLogging(klog.Infof) @@ -133,16 +137,18 @@ func New( } s := &Controller{ - cloud: cloud, - knownHosts: []*v1.Node{}, - kubeClient: kubeClient, - clusterName: clusterName, - cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, - eventBroadcaster: broadcaster, - eventRecorder: recorder, - nodeLister: nodeInformer.Lister(), - nodeListerSynced: nodeInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + cloud: cloud, + knownHosts: []*v1.Node{}, + kubeClient: kubeClient, + clusterName: clusterName, + cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, + eventBroadcaster: broadcaster, + eventRecorder: recorder, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + legacyNodeRoleFeatureEnabled: featureGate.Enabled(legacyNodeRoleBehaviorFeature), + serviceNodeExclusionFeatureEnabled: featureGate.Enabled(serviceNodeExclusionFeature), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -188,6 +194,7 @@ func New( if err := s.init(); err != nil { return nil, err } + return s, nil } @@ -383,7 +390,7 @@ func (s *Controller) syncLoadBalancerIfNeeded(service *v1.Service, key string) ( } func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) { - nodes, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate()) + nodes, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) if err != nil { return nil, err } @@ -612,7 +619,7 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool { return nodeNames(x).Equal(nodeNames(y)) } -func getNodeConditionPredicate() NodeConditionPredicate { +func (s *Controller) getNodeConditionPredicate() NodeConditionPredicate { return func(node *v1.Node) bool { // We add the master to the node list, but its unschedulable. So we use this to filter // the master. @@ -620,14 +627,14 @@ func getNodeConditionPredicate() NodeConditionPredicate { return false } - if utilfeature.DefaultFeatureGate.Enabled(legacyNodeRoleBehaviorFeature) { + if s.legacyNodeRoleFeatureEnabled { // As of 1.6, we will taint the master, but not necessarily mark it unschedulable. // Recognize nodes labeled as master, and filter them also, as we were doing previously. if _, hasMasterRoleLabel := node.Labels[labelNodeRoleMaster]; hasMasterRoleLabel { return false } } - if utilfeature.DefaultFeatureGate.Enabled(serviceNodeExclusionFeature) { + if s.serviceNodeExclusionFeatureEnabled { if _, hasExcludeBalancerLabel := node.Labels[labelNodeRoleExcludeBalancer]; hasExcludeBalancerLabel { return false } @@ -654,7 +661,7 @@ func getNodeConditionPredicate() NodeConditionPredicate { func (s *Controller) nodeSyncLoop() { s.knownHostsLock.Lock() defer s.knownHostsLock.Unlock() - newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate()) + newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) if err != nil { runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) return diff --git a/pkg/controller/service/controller_test.go b/pkg/controller/service/controller_test.go index 69ef5c72a7d..f37505ee119 100644 --- a/pkg/controller/service/controller_test.go +++ b/pkg/controller/service/controller_test.go @@ -33,14 +33,16 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" - featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog" ) const region = "us-central" @@ -70,21 +72,43 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { cloud := &fakecloud.Cloud{} cloud.Region = region - client := fake.NewSimpleClientset() + kubeClient := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(client, 0) + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) serviceInformer := informerFactory.Core().V1().Services() nodeInformer := informerFactory.Core().V1().Nodes() - controller, _ := New(cloud, client, serviceInformer, nodeInformer, "test-cluster") + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(klog.Infof) + broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) + + controller := &Controller{ + cloud: cloud, + knownHosts: []*v1.Node{}, + kubeClient: kubeClient, + clusterName: "test-cluster", + cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, + eventBroadcaster: broadcaster, + eventRecorder: recorder, + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + } + + balancer, _ := cloud.LoadBalancer() + controller.balancer = balancer + + controller.serviceLister = serviceInformer.Lister() + controller.nodeListerSynced = alwaysReady controller.serviceListerSynced = alwaysReady controller.eventRecorder = record.NewFakeRecorder(100) - cloud.Calls = nil // ignore any cloud calls made in init() - client.ClearActions() // ignore any client calls made in init() + cloud.Calls = nil // ignore any cloud calls made in init() + kubeClient.ClearActions() // ignore any client calls made in init() - return controller, cloud, client + return controller, cloud, kubeClient } // TODO(@MrHohn): Verify the end state when below issue is resolved: @@ -1367,10 +1391,12 @@ func Test_getNodeConditionPredicate(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, serviceNodeExclusionFeature, tt.enableExclusion)() - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, legacyNodeRoleBehaviorFeature, tt.enableLegacy)() + c := &Controller{ + legacyNodeRoleFeatureEnabled: tt.enableLegacy, + serviceNodeExclusionFeatureEnabled: tt.enableExclusion, + } - if result := getNodeConditionPredicate()(tt.input); result != tt.want { + if result := c.getNodeConditionPredicate()(tt.input); result != tt.want { t.Errorf("getNodeConditionPredicate() = %v, want %v", result, tt.want) } })