Merge pull request #90987 from andrewsykim/service-controller-fixup

service controller: clean up unit tests
This commit is contained in:
Kubernetes Prow Robot 2020-05-13 00:19:13 -07:00 committed by GitHub
commit 977aeab3e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 77 deletions

View File

@ -89,6 +89,7 @@ func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cl
ctx.SharedInformers.Core().V1().Services(), ctx.SharedInformers.Core().V1().Services(),
ctx.SharedInformers.Core().V1().Nodes(), ctx.SharedInformers.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName, ctx.ComponentConfig.KubeCloudShared.ClusterName,
utilfeature.DefaultFeatureGate,
) )
if err != nil { if err != nil {
// This error shouldn't fail. It lives like this as a legacy. // This error shouldn't fail. It lives like this as a legacy.

View File

@ -83,6 +83,7 @@ func startServiceController(ctx ControllerContext) (http.Handler, bool, error) {
ctx.InformerFactory.Core().V1().Services(), ctx.InformerFactory.Core().V1().Services(),
ctx.InformerFactory.Core().V1().Nodes(), ctx.InformerFactory.Core().V1().Nodes(),
ctx.ComponentConfig.KubeCloudShared.ClusterName, ctx.ComponentConfig.KubeCloudShared.ClusterName,
utilfeature.DefaultFeatureGate,
) )
if err != nil { if err != nil {
// This error shouldn't fail. It lives like this as a legacy. // This error shouldn't fail. It lives like this as a legacy.

View File

@ -15,7 +15,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
@ -26,6 +25,7 @@ go_library(
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library", "//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/component-base/featuregate:go_default_library",
"//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library", "//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library",
"//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
@ -36,7 +36,6 @@ go_test(
srcs = ["controller_test.go"], srcs = ["controller_test.go"],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/controller:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -44,14 +43,16 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider/fake:go_default_library", "//staging/src/k8s.io/cloud-provider/fake:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library", "//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//vendor/k8s.io/klog:go_default_library",
], ],
) )

View File

@ -29,7 +29,6 @@ import (
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
@ -40,6 +39,7 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider" cloudprovider "k8s.io/cloud-provider"
servicehelper "k8s.io/cloud-provider/service/helpers" servicehelper "k8s.io/cloud-provider/service/helpers"
"k8s.io/component-base/featuregate"
"k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog" "k8s.io/klog"
) )
@ -110,6 +110,9 @@ type Controller struct {
nodeListerSynced cache.InformerSynced nodeListerSynced cache.InformerSynced
// services that need to be synced // services that need to be synced
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
// feature gates stored in local field for better testability
legacyNodeRoleFeatureEnabled bool
serviceNodeExclusionFeatureEnabled bool
} }
// New returns a new service controller to keep cloud provider service resources // New returns a new service controller to keep cloud provider service resources
@ -120,6 +123,7 @@ func New(
serviceInformer coreinformers.ServiceInformer, serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer, nodeInformer coreinformers.NodeInformer,
clusterName string, clusterName string,
featureGate featuregate.FeatureGate,
) (*Controller, error) { ) (*Controller, error) {
broadcaster := record.NewBroadcaster() broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof) broadcaster.StartLogging(klog.Infof)
@ -133,16 +137,18 @@ func New(
} }
s := &Controller{ s := &Controller{
cloud: cloud, cloud: cloud,
knownHosts: []*v1.Node{}, knownHosts: []*v1.Node{},
kubeClient: kubeClient, kubeClient: kubeClient,
clusterName: clusterName, clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster, eventBroadcaster: broadcaster,
eventRecorder: recorder, eventRecorder: recorder,
nodeLister: nodeInformer.Lister(), nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced, nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
legacyNodeRoleFeatureEnabled: featureGate.Enabled(legacyNodeRoleBehaviorFeature),
serviceNodeExclusionFeatureEnabled: featureGate.Enabled(serviceNodeExclusionFeature),
} }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod( serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@ -202,6 +208,7 @@ func New(
if err := s.init(); err != nil { if err := s.init(); err != nil {
return nil, err return nil, err
} }
return s, nil return s, nil
} }
@ -397,7 +404,7 @@ func (s *Controller) syncLoadBalancerIfNeeded(service *v1.Service, key string) (
} }
func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) { func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
nodes, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate()) nodes, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -626,7 +633,7 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
return nodeNames(x).Equal(nodeNames(y)) return nodeNames(x).Equal(nodeNames(y))
} }
func getNodeConditionPredicate() NodeConditionPredicate { func (s *Controller) getNodeConditionPredicate() NodeConditionPredicate {
return func(node *v1.Node) bool { return func(node *v1.Node) bool {
// We add the master to the node list, but its unschedulable. So we use this to filter // We add the master to the node list, but its unschedulable. So we use this to filter
// the master. // the master.
@ -634,14 +641,14 @@ func getNodeConditionPredicate() NodeConditionPredicate {
return false return false
} }
if utilfeature.DefaultFeatureGate.Enabled(legacyNodeRoleBehaviorFeature) { if s.legacyNodeRoleFeatureEnabled {
// As of 1.6, we will taint the master, but not necessarily mark it unschedulable. // As of 1.6, we will taint the master, but not necessarily mark it unschedulable.
// Recognize nodes labeled as master, and filter them also, as we were doing previously. // Recognize nodes labeled as master, and filter them also, as we were doing previously.
if _, hasMasterRoleLabel := node.Labels[labelNodeRoleMaster]; hasMasterRoleLabel { if _, hasMasterRoleLabel := node.Labels[labelNodeRoleMaster]; hasMasterRoleLabel {
return false return false
} }
} }
if utilfeature.DefaultFeatureGate.Enabled(serviceNodeExclusionFeature) { if s.serviceNodeExclusionFeatureEnabled {
if _, hasExcludeBalancerLabel := node.Labels[labelNodeRoleExcludeBalancer]; hasExcludeBalancerLabel { if _, hasExcludeBalancerLabel := node.Labels[labelNodeRoleExcludeBalancer]; hasExcludeBalancerLabel {
return false return false
} }
@ -692,7 +699,7 @@ func nodeReadyConditionStatus(node *v1.Node) v1.ConditionStatus {
func (s *Controller) nodeSyncLoop() { func (s *Controller) nodeSyncLoop() {
s.knownHostsLock.Lock() s.knownHostsLock.Lock()
defer s.knownHostsLock.Unlock() defer s.knownHostsLock.Unlock()
newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate()) newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
if err != nil { if err != nil {
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
return return

View File

@ -33,16 +33,16 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/intstr"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
fakecloud "k8s.io/cloud-provider/fake" fakecloud "k8s.io/cloud-provider/fake"
servicehelper "k8s.io/cloud-provider/service/helpers" servicehelper "k8s.io/cloud-provider/service/helpers"
featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
) )
const region = "us-central" const region = "us-central"
@ -72,21 +72,43 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
cloud := &fakecloud.Cloud{} cloud := &fakecloud.Cloud{}
cloud.Region = region cloud.Region = region
client := fake.NewSimpleClientset() kubeClient := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
serviceInformer := informerFactory.Core().V1().Services() serviceInformer := informerFactory.Core().V1().Services()
nodeInformer := informerFactory.Core().V1().Nodes() nodeInformer := informerFactory.Core().V1().Nodes()
controller, _ := New(cloud, client, serviceInformer, nodeInformer, "test-cluster") broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(klog.Infof)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
controller := &Controller{
cloud: cloud,
knownHosts: []*v1.Node{},
kubeClient: kubeClient,
clusterName: "test-cluster",
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
}
balancer, _ := cloud.LoadBalancer()
controller.balancer = balancer
controller.serviceLister = serviceInformer.Lister()
controller.nodeListerSynced = alwaysReady controller.nodeListerSynced = alwaysReady
controller.serviceListerSynced = alwaysReady controller.serviceListerSynced = alwaysReady
controller.eventRecorder = record.NewFakeRecorder(100) controller.eventRecorder = record.NewFakeRecorder(100)
cloud.Calls = nil // ignore any cloud calls made in init() cloud.Calls = nil // ignore any cloud calls made in init()
client.ClearActions() // ignore any client calls made in init() kubeClient.ClearActions() // ignore any client calls made in init()
return controller, cloud, client return controller, cloud, kubeClient
} }
// TODO(@MrHohn): Verify the end state when below issue is resolved: // TODO(@MrHohn): Verify the end state when below issue is resolved:
@ -477,50 +499,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
} }
} }
func TestGetNodeConditionPredicate(t *testing.T) {
tests := []struct {
node v1.Node
expectAccept bool
name string
}{
{
node: v1.Node{},
expectAccept: false,
name: "empty",
},
{
node: v1.Node{
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionTrue},
},
},
},
expectAccept: true,
name: "basic",
},
{
node: v1.Node{
Spec: v1.NodeSpec{Unschedulable: true},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionTrue},
},
},
},
expectAccept: false,
name: "unschedulable",
},
}
pred := getNodeConditionPredicate()
for _, test := range tests {
accept := pred(&test.node)
if accept != test.expectAccept {
t.Errorf("Test failed for %s, expected %v, saw %v", test.name, test.expectAccept, accept)
}
}
}
func TestProcessServiceCreateOrUpdate(t *testing.T) { func TestProcessServiceCreateOrUpdate(t *testing.T) {
controller, _, client := newController() controller, _, client := newController()
@ -1413,10 +1391,12 @@ func Test_getNodeConditionPredicate(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, serviceNodeExclusionFeature, tt.enableExclusion)() c := &Controller{
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, legacyNodeRoleBehaviorFeature, tt.enableLegacy)() legacyNodeRoleFeatureEnabled: tt.enableLegacy,
serviceNodeExclusionFeatureEnabled: tt.enableExclusion,
}
if result := getNodeConditionPredicate()(tt.input); result != tt.want { if result := c.getNodeConditionPredicate()(tt.input); result != tt.want {
t.Errorf("getNodeConditionPredicate() = %v, want %v", result, tt.want) t.Errorf("getNodeConditionPredicate() = %v, want %v", result, tt.want)
} }
}) })