mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
service controller: store feature gate in local fields for better testability
Signed-off-by: Andrew Sy Kim <kim.andrewsy@gmail.com>
This commit is contained in:
parent
b3419e0ccf
commit
758c25de2f
@ -89,6 +89,7 @@ func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cl
|
||||
ctx.SharedInformers.Core().V1().Services(),
|
||||
ctx.SharedInformers.Core().V1().Nodes(),
|
||||
ctx.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
utilfeature.DefaultFeatureGate,
|
||||
)
|
||||
if err != nil {
|
||||
// This error shouldn't fail. It lives like this as a legacy.
|
||||
|
@ -83,6 +83,7 @@ func startServiceController(ctx ControllerContext) (http.Handler, bool, error) {
|
||||
ctx.InformerFactory.Core().V1().Services(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.ComponentConfig.KubeCloudShared.ClusterName,
|
||||
utilfeature.DefaultFeatureGate,
|
||||
)
|
||||
if err != nil {
|
||||
// This error shouldn't fail. It lives like this as a legacy.
|
||||
|
@ -15,7 +15,6 @@ go_library(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
@ -26,6 +25,7 @@ go_library(
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
@ -43,14 +43,16 @@ go_test(
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/fake:go_default_library",
|
||||
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
|
||||
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -29,7 +29,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
@ -40,6 +39,7 @@ import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
cloudprovider "k8s.io/cloud-provider"
|
||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||
"k8s.io/component-base/featuregate"
|
||||
"k8s.io/component-base/metrics/prometheus/ratelimiter"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
@ -110,6 +110,9 @@ type Controller struct {
|
||||
nodeListerSynced cache.InformerSynced
|
||||
// services that need to be synced
|
||||
queue workqueue.RateLimitingInterface
|
||||
// feature gates stored in local field for better testability
|
||||
legacyNodeRoleFeatureEnabled bool
|
||||
serviceNodeExclusionFeatureEnabled bool
|
||||
}
|
||||
|
||||
// New returns a new service controller to keep cloud provider service resources
|
||||
@ -120,6 +123,7 @@ func New(
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
nodeInformer coreinformers.NodeInformer,
|
||||
clusterName string,
|
||||
featureGate featuregate.FeatureGate,
|
||||
) (*Controller, error) {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartLogging(klog.Infof)
|
||||
@ -133,16 +137,18 @@ func New(
|
||||
}
|
||||
|
||||
s := &Controller{
|
||||
cloud: cloud,
|
||||
knownHosts: []*v1.Node{},
|
||||
kubeClient: kubeClient,
|
||||
clusterName: clusterName,
|
||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||
eventBroadcaster: broadcaster,
|
||||
eventRecorder: recorder,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
cloud: cloud,
|
||||
knownHosts: []*v1.Node{},
|
||||
kubeClient: kubeClient,
|
||||
clusterName: clusterName,
|
||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||
eventBroadcaster: broadcaster,
|
||||
eventRecorder: recorder,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
legacyNodeRoleFeatureEnabled: featureGate.Enabled(legacyNodeRoleBehaviorFeature),
|
||||
serviceNodeExclusionFeatureEnabled: featureGate.Enabled(serviceNodeExclusionFeature),
|
||||
}
|
||||
|
||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||
@ -188,6 +194,7 @@ func New(
|
||||
if err := s.init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@ -383,7 +390,7 @@ func (s *Controller) syncLoadBalancerIfNeeded(service *v1.Service, key string) (
|
||||
}
|
||||
|
||||
func (s *Controller) ensureLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
|
||||
nodes, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())
|
||||
nodes, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -612,7 +619,7 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool {
|
||||
return nodeNames(x).Equal(nodeNames(y))
|
||||
}
|
||||
|
||||
func getNodeConditionPredicate() NodeConditionPredicate {
|
||||
func (s *Controller) getNodeConditionPredicate() NodeConditionPredicate {
|
||||
return func(node *v1.Node) bool {
|
||||
// We add the master to the node list, but its unschedulable. So we use this to filter
|
||||
// the master.
|
||||
@ -620,14 +627,14 @@ func getNodeConditionPredicate() NodeConditionPredicate {
|
||||
return false
|
||||
}
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(legacyNodeRoleBehaviorFeature) {
|
||||
if s.legacyNodeRoleFeatureEnabled {
|
||||
// As of 1.6, we will taint the master, but not necessarily mark it unschedulable.
|
||||
// Recognize nodes labeled as master, and filter them also, as we were doing previously.
|
||||
if _, hasMasterRoleLabel := node.Labels[labelNodeRoleMaster]; hasMasterRoleLabel {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(serviceNodeExclusionFeature) {
|
||||
if s.serviceNodeExclusionFeatureEnabled {
|
||||
if _, hasExcludeBalancerLabel := node.Labels[labelNodeRoleExcludeBalancer]; hasExcludeBalancerLabel {
|
||||
return false
|
||||
}
|
||||
@ -654,7 +661,7 @@ func getNodeConditionPredicate() NodeConditionPredicate {
|
||||
func (s *Controller) nodeSyncLoop() {
|
||||
s.knownHostsLock.Lock()
|
||||
defer s.knownHostsLock.Unlock()
|
||||
newHosts, err := listWithPredicate(s.nodeLister, getNodeConditionPredicate())
|
||||
newHosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err))
|
||||
return
|
||||
|
@ -33,14 +33,16 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
fakecloud "k8s.io/cloud-provider/fake"
|
||||
servicehelper "k8s.io/cloud-provider/service/helpers"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
const region = "us-central"
|
||||
@ -70,21 +72,43 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) {
|
||||
cloud := &fakecloud.Cloud{}
|
||||
cloud.Region = region
|
||||
|
||||
client := fake.NewSimpleClientset()
|
||||
kubeClient := fake.NewSimpleClientset()
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(client, 0)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
|
||||
serviceInformer := informerFactory.Core().V1().Services()
|
||||
nodeInformer := informerFactory.Core().V1().Nodes()
|
||||
|
||||
controller, _ := New(cloud, client, serviceInformer, nodeInformer, "test-cluster")
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartLogging(klog.Infof)
|
||||
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
|
||||
|
||||
controller := &Controller{
|
||||
cloud: cloud,
|
||||
knownHosts: []*v1.Node{},
|
||||
kubeClient: kubeClient,
|
||||
clusterName: "test-cluster",
|
||||
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
|
||||
eventBroadcaster: broadcaster,
|
||||
eventRecorder: recorder,
|
||||
nodeLister: nodeInformer.Lister(),
|
||||
nodeListerSynced: nodeInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
|
||||
}
|
||||
|
||||
balancer, _ := cloud.LoadBalancer()
|
||||
controller.balancer = balancer
|
||||
|
||||
controller.serviceLister = serviceInformer.Lister()
|
||||
|
||||
controller.nodeListerSynced = alwaysReady
|
||||
controller.serviceListerSynced = alwaysReady
|
||||
controller.eventRecorder = record.NewFakeRecorder(100)
|
||||
|
||||
cloud.Calls = nil // ignore any cloud calls made in init()
|
||||
client.ClearActions() // ignore any client calls made in init()
|
||||
cloud.Calls = nil // ignore any cloud calls made in init()
|
||||
kubeClient.ClearActions() // ignore any client calls made in init()
|
||||
|
||||
return controller, cloud, client
|
||||
return controller, cloud, kubeClient
|
||||
}
|
||||
|
||||
// TODO(@MrHohn): Verify the end state when below issue is resolved:
|
||||
@ -1367,10 +1391,12 @@ func Test_getNodeConditionPredicate(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, serviceNodeExclusionFeature, tt.enableExclusion)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, legacyNodeRoleBehaviorFeature, tt.enableLegacy)()
|
||||
c := &Controller{
|
||||
legacyNodeRoleFeatureEnabled: tt.enableLegacy,
|
||||
serviceNodeExclusionFeatureEnabled: tt.enableExclusion,
|
||||
}
|
||||
|
||||
if result := getNodeConditionPredicate()(tt.input); result != tt.want {
|
||||
if result := c.getNodeConditionPredicate()(tt.input); result != tt.want {
|
||||
t.Errorf("getNodeConditionPredicate() = %v, want %v", result, tt.want)
|
||||
}
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user