Merge pull request #119271 from aojea/servicedefault_informer

controlplane: kubernetes.default controller stop polling
This commit is contained in:
Kubernetes Prow Robot 2023-07-13 05:52:05 -07:00 committed by GitHub
commit 2d6d2b938a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 113 additions and 88 deletions

View File

@ -31,7 +31,10 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
v1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
@ -47,7 +50,9 @@ const (
type Controller struct {
Config
client kubernetes.Interface
client kubernetes.Interface
serviceLister v1listers.ServiceLister
serviceSynced cache.InformerSynced
lock sync.Mutex
stopCh chan struct{} // closed by Stop()
@ -67,17 +72,23 @@ type Config struct {
}
// New returns a controller for watching the kubernetes service endpoints.
func New(config Config, client kubernetes.Interface) *Controller {
func New(config Config, client kubernetes.Interface, serviceInformer v1informers.ServiceInformer) *Controller {
return &Controller{
Config: config,
client: client,
stopCh: make(chan struct{}),
Config: config,
client: client,
serviceLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
stopCh: make(chan struct{}),
}
}
// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, c.serviceSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// Reconcile during first run removing itself until server is ready.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
@ -153,20 +164,6 @@ func (c *Controller) Run(ch <-chan struct{}) {
// UpdateKubernetesService attempts to update the default Kube service.
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// Update service & endpoint records.
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: metav1.NamespaceDefault,
Namespace: "",
},
}, metav1.CreateOptions{}); err != nil && !errors.IsAlreadyExists(err) {
return err
}
}
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https")
if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
return err
@ -209,8 +206,9 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
if s, err := c.serviceLister.Services(metav1.NamespaceDefault).Get(serviceName); err == nil {
// The service already exists.
// This path is no executed since 1.17 2a9a9fa, keeping it in case it needs to be revisited
if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)

View File

@ -19,12 +19,16 @@ package kubernetesservice
import (
"reflect"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
v1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
v1listers "k8s.io/client-go/listers/core/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
netutils "k8s.io/utils/net"
)
@ -65,29 +69,33 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
},
}
for _, test := range createTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
master.client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "create" {
creates = append(creates, action.(core.CreateAction))
}
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creations: %v", test.testName, creates)
} else {
obj := creates[0].GetObject()
if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
t.Run(test.testName, func(t *testing.T) {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
master.serviceLister = v1listers.NewServiceLister(serviceStore)
master.client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "create" {
creates = append(creates, action.(core.CreateAction))
}
}
}
if test.expectCreate == nil && len(creates) > 1 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creations: %v", test.testName, creates)
} else {
obj := creates[0].GetObject()
if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectCreate == nil && len(creates) > 1 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
})
}
reconcileTests := []struct {
@ -347,32 +355,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
},
}
for _, test := range reconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
t.Run(test.testName, func(t *testing.T) {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{})
serviceStore := serviceInformer.GetIndexer()
err := serviceStore.Add(test.service)
if err != nil {
t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err)
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
master.serviceLister = v1listers.NewServiceLister(serviceStore)
master.client = fakeClient
err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
})
}
nonReconcileTests := []struct {
@ -406,31 +423,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
},
}
for _, test := range nonReconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
t.Run(test.testName, func(t *testing.T) {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{})
serviceStore := serviceInformer.GetIndexer()
err := serviceStore.Add(test.service)
if err != nil {
t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err)
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
master.serviceLister = v1listers.NewServiceLister(serviceStore)
err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
})
}
}

View File

@ -483,7 +483,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
}, clientset)
}, clientset, c.ExtraConfig.VersionedInformers.Core().V1().Services())
m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
return nil