controlplane: kubernetes.default controller stop polling

the kubernetesservice controller is in charge of reconciling the
kubernetes.default service with the first IP in the service CIDR range
and port 443, it also maintains the Endpoints associated to the Service
using the configure EndpointReconciler.

Until now, the controller was creating the default namespace if it
doesn't exist , and creating the kubernetes.default service if it
doesn't exist too. However, it was polling the Service in each loop,
with this change we reuse the apiserver informers to watch the Service
instead of polling.

It also removes the logic to create the default network namespace, since
this is part of the systemnamespaces controller now.

Change-Id: I70954f8e6309e7af8e4b749bf0752168f0ec2c42
Signed-off-by: Antonio Ojea <aojea@google.com>
This commit is contained in:
Antonio Ojea 2023-07-12 22:05:42 +00:00
parent e3fe07e322
commit c5147c91b8
3 changed files with 113 additions and 88 deletions

View File

@ -31,7 +31,10 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
v1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
@ -47,7 +50,9 @@ const (
type Controller struct {
Config
client kubernetes.Interface
client kubernetes.Interface
serviceLister v1listers.ServiceLister
serviceSynced cache.InformerSynced
lock sync.Mutex
stopCh chan struct{} // closed by Stop()
@ -67,17 +72,23 @@ type Config struct {
}
// New returns a controller for watching the kubernetes service endpoints.
func New(config Config, client kubernetes.Interface) *Controller {
func New(config Config, client kubernetes.Interface, serviceInformer v1informers.ServiceInformer) *Controller {
return &Controller{
Config: config,
client: client,
stopCh: make(chan struct{}),
Config: config,
client: client,
serviceLister: serviceInformer.Lister(),
serviceSynced: serviceInformer.Informer().HasSynced,
stopCh: make(chan struct{}),
}
}
// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, c.serviceSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// Reconcile during first run removing itself until server is ready.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
@ -153,20 +164,6 @@ func (c *Controller) Run(ch <-chan struct{}) {
// UpdateKubernetesService attempts to update the default Kube service.
func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// Update service & endpoint records.
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: metav1.NamespaceDefault,
Namespace: "",
},
}, metav1.CreateOptions{}); err != nil && !errors.IsAlreadyExists(err) {
return err
}
}
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https")
if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
return err
@ -209,8 +206,9 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
if s, err := c.serviceLister.Services(metav1.NamespaceDefault).Get(serviceName); err == nil {
// The service already exists.
// This path is no executed since 1.17 2a9a9fa, keeping it in case it needs to be revisited
if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)

View File

@ -19,12 +19,16 @@ package kubernetesservice
import (
"reflect"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
v1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes/fake"
v1listers "k8s.io/client-go/listers/core/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
netutils "k8s.io/utils/net"
)
@ -65,29 +69,33 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
},
}
for _, test := range createTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
master.client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "create" {
creates = append(creates, action.(core.CreateAction))
}
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creations: %v", test.testName, creates)
} else {
obj := creates[0].GetObject()
if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
t.Run(test.testName, func(t *testing.T) {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
master.serviceLister = v1listers.NewServiceLister(serviceStore)
master.client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "create" {
creates = append(creates, action.(core.CreateAction))
}
}
}
if test.expectCreate == nil && len(creates) > 1 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creations: %v", test.testName, creates)
} else {
obj := creates[0].GetObject()
if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectCreate == nil && len(creates) > 1 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
})
}
reconcileTests := []struct {
@ -347,32 +355,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
},
}
for _, test := range reconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
t.Run(test.testName, func(t *testing.T) {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{})
serviceStore := serviceInformer.GetIndexer()
err := serviceStore.Add(test.service)
if err != nil {
t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err)
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
master.serviceLister = v1listers.NewServiceLister(serviceStore)
master.client = fakeClient
err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
})
}
nonReconcileTests := []struct {
@ -406,31 +423,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
},
}
for _, test := range nonReconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
t.Run(test.testName, func(t *testing.T) {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{})
serviceStore := serviceInformer.GetIndexer()
err := serviceStore.Add(test.service)
if err != nil {
t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err)
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
master.serviceLister = v1listers.NewServiceLister(serviceStore)
err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
})
}
}

View File

@ -483,7 +483,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
}, clientset)
}, clientset, c.ExtraConfig.VersionedInformers.Core().V1().Services())
m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
return nil