mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 04:27:54 +00:00
kube-apiserver: remove IP repair plumbing from kubernetes service controller
This commit is contained in:
parent
51429cb5af
commit
65b34221eb
@ -21,28 +21,19 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/intstr"
|
"k8s.io/apimachinery/pkg/util/intstr"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
|
||||||
"k8s.io/apimachinery/pkg/util/runtime"
|
"k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
||||||
"k8s.io/client-go/informers"
|
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||||
"k8s.io/kubernetes/pkg/features"
|
|
||||||
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
|
|
||||||
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
|
|
||||||
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
|
|
||||||
"k8s.io/kubernetes/pkg/util/async"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -54,27 +45,11 @@ const (
|
|||||||
// provide the IP repair check on service IPs
|
// provide the IP repair check on service IPs
|
||||||
type Controller struct {
|
type Controller struct {
|
||||||
Config
|
Config
|
||||||
RangeRegistries
|
|
||||||
|
|
||||||
runner *async.Runner
|
client kubernetes.Interface
|
||||||
}
|
|
||||||
|
|
||||||
type RangeRegistries struct {
|
|
||||||
ServiceClusterIPRegistry rangeallocation.RangeRegistry
|
|
||||||
SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
|
|
||||||
ServiceNodePortRegistry rangeallocation.RangeRegistry
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Client kubernetes.Interface
|
|
||||||
Informers informers.SharedInformerFactory
|
|
||||||
|
|
||||||
KubernetesService
|
|
||||||
ClusterIP
|
|
||||||
NodePort
|
|
||||||
}
|
|
||||||
|
|
||||||
type KubernetesService struct {
|
|
||||||
PublicIP net.IP
|
PublicIP net.IP
|
||||||
|
|
||||||
EndpointReconciler reconcilers.EndpointReconciler
|
EndpointReconciler reconcilers.EndpointReconciler
|
||||||
@ -87,32 +62,17 @@ type KubernetesService struct {
|
|||||||
KubernetesServiceNodePort int
|
KubernetesServiceNodePort int
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterIP struct {
|
|
||||||
ServiceClusterIPRange net.IPNet
|
|
||||||
SecondaryServiceClusterIPRange net.IPNet
|
|
||||||
ServiceClusterIPInterval time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
type NodePort struct {
|
|
||||||
ServiceNodePortInterval time.Duration
|
|
||||||
ServiceNodePortRange utilnet.PortRange
|
|
||||||
}
|
|
||||||
|
|
||||||
// New returns a controller for watching the kubernetes service endpoints.
|
// New returns a controller for watching the kubernetes service endpoints.
|
||||||
func New(config Config, rangeRegistries RangeRegistries) (*Controller, error) {
|
func New(config Config, client kubernetes.Interface) *Controller {
|
||||||
return &Controller{
|
return &Controller{
|
||||||
Config: config,
|
Config: config,
|
||||||
RangeRegistries: rangeRegistries,
|
client: client,
|
||||||
}, nil
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins the core controller loops that must exist for bootstrapping
|
// Start begins the core controller loops that must exist for bootstrapping
|
||||||
// a cluster.
|
// a cluster.
|
||||||
func (c *Controller) Start() {
|
func (c *Controller) Start(stopCh <-chan struct{}) {
|
||||||
if c.runner != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reconcile during first run removing itself until server is ready.
|
// Reconcile during first run removing itself until server is ready.
|
||||||
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
|
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
|
||||||
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
|
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
|
||||||
@ -121,72 +81,11 @@ func (c *Controller) Start() {
|
|||||||
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
|
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.Client.CoreV1(), c.Client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
|
go c.Run(stopCh)
|
||||||
|
|
||||||
// We start both repairClusterIPs and repairNodePorts to ensure repair
|
|
||||||
// loops of ClusterIPs and NodePorts.
|
|
||||||
// We run both repair loops using RunUntil public interface.
|
|
||||||
// However, we want to fail liveness/readiness until the first
|
|
||||||
// successful repair loop, so we basically pass appropriate
|
|
||||||
// callbacks to RunUtil methods.
|
|
||||||
// Additionally, we ensure that we don't wait for it for longer
|
|
||||||
// than 1 minute for backward compatibility of failing the whole
|
|
||||||
// apiserver if we can't repair them.
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
runRepairNodePorts := func(stopCh chan struct{}) {
|
|
||||||
repairNodePorts.RunUntil(wg.Done, stopCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Add(1)
|
|
||||||
var runRepairClusterIPs func(stopCh chan struct{})
|
|
||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
|
|
||||||
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval,
|
|
||||||
c.Client.CoreV1(),
|
|
||||||
c.Client.EventsV1(),
|
|
||||||
&c.ServiceClusterIPRange,
|
|
||||||
c.ServiceClusterIPRegistry,
|
|
||||||
&c.SecondaryServiceClusterIPRange,
|
|
||||||
c.SecondaryServiceClusterIPRegistry)
|
|
||||||
runRepairClusterIPs = func(stopCh chan struct{}) {
|
|
||||||
repairClusterIPs.RunUntil(wg.Done, stopCh)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval,
|
|
||||||
c.Client,
|
|
||||||
&c.ServiceClusterIPRange,
|
|
||||||
&c.SecondaryServiceClusterIPRange,
|
|
||||||
c.Informers.Core().V1().Services(),
|
|
||||||
c.Informers.Networking().V1alpha1().IPAddresses(),
|
|
||||||
)
|
|
||||||
runRepairClusterIPs = func(stopCh chan struct{}) {
|
|
||||||
repairClusterIPs.RunUntil(wg.Done, stopCh)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
|
|
||||||
c.runner.Start()
|
|
||||||
|
|
||||||
// For backward compatibility, we ensure that if we never are able
|
|
||||||
// to repair clusterIPs and/or nodeports, we not only fail the liveness
|
|
||||||
// and/or readiness, but also explicitly fail.
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(done)
|
|
||||||
wg.Wait()
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
case <-time.After(time.Minute):
|
|
||||||
klog.Fatalf("Unable to perform initial IP and Port allocation check")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
|
// Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
|
||||||
func (c *Controller) Stop() {
|
func (c *Controller) Stop() {
|
||||||
if c.runner != nil {
|
|
||||||
c.runner.Stop()
|
|
||||||
}
|
|
||||||
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
|
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
|
||||||
finishedReconciling := make(chan struct{})
|
finishedReconciling := make(chan struct{})
|
||||||
go func() {
|
go func() {
|
||||||
@ -208,12 +107,12 @@ func (c *Controller) Stop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunKubernetesService periodically updates the kubernetes service
|
// Run periodically updates the kubernetes service
|
||||||
func (c *Controller) RunKubernetesService(ch chan struct{}) {
|
func (c *Controller) Run(ch <-chan struct{}) {
|
||||||
// wait until process is ready
|
// wait until process is ready
|
||||||
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
||||||
var code int
|
var code int
|
||||||
c.Client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
|
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
|
||||||
return code == http.StatusOK, nil
|
return code == http.StatusOK, nil
|
||||||
}, ch)
|
}, ch)
|
||||||
|
|
||||||
@ -233,8 +132,8 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
|
|||||||
// TODO: when it becomes possible to change this stuff,
|
// TODO: when it becomes possible to change this stuff,
|
||||||
// stop polling and start watching.
|
// stop polling and start watching.
|
||||||
// TODO: add endpoints of all replicas, not just the elected master.
|
// TODO: add endpoints of all replicas, not just the elected master.
|
||||||
if _, err := c.Client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
|
if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
|
||||||
if _, err := c.Client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
|
if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: metav1.NamespaceDefault,
|
Name: metav1.NamespaceDefault,
|
||||||
Namespace: "",
|
Namespace: "",
|
||||||
@ -286,12 +185,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
|
|||||||
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
|
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
|
||||||
// doesn't already exist.
|
// doesn't already exist.
|
||||||
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
|
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
|
||||||
if s, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
|
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
|
||||||
// The service already exists.
|
// The service already exists.
|
||||||
if reconcile {
|
if reconcile {
|
||||||
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
|
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
|
||||||
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
|
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
|
||||||
_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
|
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -315,7 +214,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
|
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
|
||||||
if errors.IsAlreadyExists(err) {
|
if errors.IsAlreadyExists(err) {
|
||||||
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
|
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
|
|||||||
for _, test := range createTests {
|
for _, test := range createTests {
|
||||||
master := Controller{}
|
master := Controller{}
|
||||||
fakeClient := fake.NewSimpleClientset()
|
fakeClient := fake.NewSimpleClientset()
|
||||||
master.Client = fakeClient
|
master.client = fakeClient
|
||||||
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
||||||
creates := []core.CreateAction{}
|
creates := []core.CreateAction{}
|
||||||
for _, action := range fakeClient.Actions() {
|
for _, action := range fakeClient.Actions() {
|
||||||
@ -349,7 +349,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
|
|||||||
for _, test := range reconcileTests {
|
for _, test := range reconcileTests {
|
||||||
master := Controller{}
|
master := Controller{}
|
||||||
fakeClient := fake.NewSimpleClientset(test.service)
|
fakeClient := fake.NewSimpleClientset(test.service)
|
||||||
master.Client = fakeClient
|
master.client = fakeClient
|
||||||
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
|
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||||
@ -408,7 +408,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
|
|||||||
for _, test := range nonReconcileTests {
|
for _, test := range nonReconcileTests {
|
||||||
master := Controller{}
|
master := Controller{}
|
||||||
fakeClient := fake.NewSimpleClientset(test.service)
|
fakeClient := fake.NewSimpleClientset(test.service)
|
||||||
master.Client = fakeClient
|
master.client = fakeClient
|
||||||
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||||
|
Loading…
Reference in New Issue
Block a user