mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Merge pull request #111191 from aojea/controller-client-go
refactor controlplane to use just one client-go
This commit is contained in:
commit
f19a26a22e
@ -33,9 +33,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
"k8s.io/client-go/kubernetes"
|
||||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
|
||||||
"k8s.io/client-go/rest"
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||||
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
|
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
|
||||||
@ -55,10 +53,7 @@ const (
|
|||||||
// "default", "kube-system" and "kube-public" namespaces, and provide the IP
|
// "default", "kube-system" and "kube-public" namespaces, and provide the IP
|
||||||
// repair check on service IPs
|
// repair check on service IPs
|
||||||
type Controller struct {
|
type Controller struct {
|
||||||
ServiceClient corev1client.ServicesGetter
|
client kubernetes.Interface
|
||||||
NamespaceClient corev1client.NamespacesGetter
|
|
||||||
EventClient eventsv1client.EventsV1Interface
|
|
||||||
readyzClient rest.Interface
|
|
||||||
|
|
||||||
ServiceClusterIPRegistry rangeallocation.RangeRegistry
|
ServiceClusterIPRegistry rangeallocation.RangeRegistry
|
||||||
ServiceClusterIPRange net.IPNet
|
ServiceClusterIPRange net.IPNet
|
||||||
@ -89,7 +84,7 @@ type Controller struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewBootstrapController returns a controller for watching the core capabilities of the master
|
// NewBootstrapController returns a controller for watching the core capabilities of the master
|
||||||
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient eventsv1client.EventsV1Interface, readyzClient rest.Interface) (*Controller, error) {
|
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, client kubernetes.Interface) (*Controller, error) {
|
||||||
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
|
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get listener address: %w", err)
|
return nil, fmt.Errorf("failed to get listener address: %w", err)
|
||||||
@ -109,10 +104,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
|
|||||||
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
|
systemNamespaces := []string{metav1.NamespaceSystem, metav1.NamespacePublic, corev1.NamespaceNodeLease}
|
||||||
|
|
||||||
return &Controller{
|
return &Controller{
|
||||||
ServiceClient: serviceClient,
|
client: client,
|
||||||
NamespaceClient: nsClient,
|
|
||||||
EventClient: eventClient,
|
|
||||||
readyzClient: readyzClient,
|
|
||||||
|
|
||||||
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
|
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
|
||||||
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
|
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
|
||||||
@ -167,8 +159,8 @@ func (c *Controller) Start() {
|
|||||||
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
|
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
|
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.client.CoreV1(), c.client.EventsV1(), &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
|
||||||
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
|
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.client.CoreV1(), c.client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
|
||||||
|
|
||||||
// We start both repairClusterIPs and repairNodePorts to ensure repair
|
// We start both repairClusterIPs and repairNodePorts to ensure repair
|
||||||
// loops of ClusterIPs and NodePorts.
|
// loops of ClusterIPs and NodePorts.
|
||||||
@ -238,7 +230,7 @@ func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) {
|
|||||||
wait.Until(func() {
|
wait.Until(func() {
|
||||||
// Loop the system namespace list, and create them if they do not exist
|
// Loop the system namespace list, and create them if they do not exist
|
||||||
for _, ns := range c.SystemNamespaces {
|
for _, ns := range c.SystemNamespaces {
|
||||||
if err := createNamespaceIfNeeded(c.NamespaceClient, ns); err != nil {
|
if err := createNamespaceIfNeeded(c.client.CoreV1(), ns); err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
|
runtime.HandleError(fmt.Errorf("unable to create required kubernetes system namespace %s: %v", ns, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -250,7 +242,7 @@ func (c *Controller) RunKubernetesService(ch chan struct{}) {
|
|||||||
// wait until process is ready
|
// wait until process is ready
|
||||||
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
||||||
var code int
|
var code int
|
||||||
c.readyzClient.Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
|
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
|
||||||
return code == http.StatusOK, nil
|
return code == http.StatusOK, nil
|
||||||
}, ch)
|
}, ch)
|
||||||
|
|
||||||
@ -270,7 +262,7 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
|
|||||||
// TODO: when it becomes possible to change this stuff,
|
// TODO: when it becomes possible to change this stuff,
|
||||||
// stop polling and start watching.
|
// stop polling and start watching.
|
||||||
// TODO: add endpoints of all replicas, not just the elected master.
|
// TODO: add endpoints of all replicas, not just the elected master.
|
||||||
if err := createNamespaceIfNeeded(c.NamespaceClient, metav1.NamespaceDefault); err != nil {
|
if err := createNamespaceIfNeeded(c.client.CoreV1(), metav1.NamespaceDefault); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -316,12 +308,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
|
|||||||
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
|
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
|
||||||
// doesn't already exist.
|
// doesn't already exist.
|
||||||
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
|
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
|
||||||
if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
|
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
|
||||||
// The service already exists.
|
// The service already exists.
|
||||||
if reconcile {
|
if reconcile {
|
||||||
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
|
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
|
||||||
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
|
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
|
||||||
_, err := c.ServiceClient.Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
|
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -345,7 +337,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.ServiceClient.Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
|
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
|
||||||
if errors.IsAlreadyExists(err) {
|
if errors.IsAlreadyExists(err) {
|
||||||
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
|
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
|
||||||
}
|
}
|
||||||
|
@ -24,10 +24,8 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||||
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/kubernetes/fake"
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
||||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
|
||||||
"k8s.io/client-go/rest"
|
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||||
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
|
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
|
||||||
@ -73,7 +71,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
|
|||||||
for _, test := range createTests {
|
for _, test := range createTests {
|
||||||
master := Controller{}
|
master := Controller{}
|
||||||
fakeClient := fake.NewSimpleClientset()
|
fakeClient := fake.NewSimpleClientset()
|
||||||
master.ServiceClient = fakeClient.CoreV1()
|
master.client = fakeClient
|
||||||
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
||||||
creates := []core.CreateAction{}
|
creates := []core.CreateAction{}
|
||||||
for _, action := range fakeClient.Actions() {
|
for _, action := range fakeClient.Actions() {
|
||||||
@ -355,7 +353,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
|
|||||||
for _, test := range reconcileTests {
|
for _, test := range reconcileTests {
|
||||||
master := Controller{}
|
master := Controller{}
|
||||||
fakeClient := fake.NewSimpleClientset(test.service)
|
fakeClient := fake.NewSimpleClientset(test.service)
|
||||||
master.ServiceClient = fakeClient.CoreV1()
|
master.client = fakeClient
|
||||||
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
|
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||||
@ -414,7 +412,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
|
|||||||
for _, test := range nonReconcileTests {
|
for _, test := range nonReconcileTests {
|
||||||
master := Controller{}
|
master := Controller{}
|
||||||
fakeClient := fake.NewSimpleClientset(test.service)
|
fakeClient := fake.NewSimpleClientset(test.service)
|
||||||
master.ServiceClient = fakeClient.CoreV1()
|
master.client = fakeClient
|
||||||
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||||
@ -458,10 +456,7 @@ func Test_completedConfig_NewBootstrapController(t *testing.T) {
|
|||||||
|
|
||||||
type args struct {
|
type args struct {
|
||||||
legacyRESTStorage corerest.LegacyRESTStorage
|
legacyRESTStorage corerest.LegacyRESTStorage
|
||||||
serviceClient corev1client.ServicesGetter
|
client kubernetes.Interface
|
||||||
nsClient corev1client.NamespacesGetter
|
|
||||||
eventClient eventsv1client.EventsV1Interface
|
|
||||||
readyzClient rest.Interface
|
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@ -585,7 +580,7 @@ func Test_completedConfig_NewBootstrapController(t *testing.T) {
|
|||||||
GenericConfig: tt.config.Complete(nil),
|
GenericConfig: tt.config.Complete(nil),
|
||||||
ExtraConfig: tt.extraConfig,
|
ExtraConfig: tt.extraConfig,
|
||||||
}
|
}
|
||||||
_, err := c.NewBootstrapController(tt.args.legacyRESTStorage, tt.args.serviceClient, tt.args.nsClient, tt.args.eventClient, tt.args.readyzClient)
|
_, err := c.NewBootstrapController(tt.args.legacyRESTStorage, tt.args.client)
|
||||||
if (err != nil) != tt.wantErr {
|
if (err != nil) != tt.wantErr {
|
||||||
t.Errorf("completedConfig.NewBootstrapController() error = %v, wantErr %v", err, tt.wantErr)
|
t.Errorf("completedConfig.NewBootstrapController() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
return
|
return
|
||||||
|
@ -69,7 +69,6 @@ import (
|
|||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
|
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
|
||||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
|
||||||
"k8s.io/component-helpers/apimachinery/lease"
|
"k8s.io/component-helpers/apimachinery/lease"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
api "k8s.io/kubernetes/pkg/apis/core"
|
api "k8s.io/kubernetes/pkg/apis/core"
|
||||||
@ -531,9 +530,8 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
|
|||||||
}
|
}
|
||||||
|
|
||||||
controllerName := "bootstrap-controller"
|
controllerName := "bootstrap-controller"
|
||||||
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
client := kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||||
eventsClient := eventsv1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, client)
|
||||||
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, eventsClient, coreClient.RESTClient())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error creating bootstrap controller: %v", err)
|
return fmt.Errorf("error creating bootstrap controller: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user