diff --git a/pkg/controlplane/controller/kubernetesservice/controller.go b/pkg/controlplane/controller/kubernetesservice/controller.go index 8c72f06b3de..1c5ec0590b0 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller.go +++ b/pkg/controlplane/controller/kubernetesservice/controller.go @@ -28,21 +28,13 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage" - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controlplane/reconcilers" - "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/registry/core/rangeallocation" - servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller" - portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller" - "k8s.io/kubernetes/pkg/util/async" ) const ( @@ -54,27 +46,14 @@ const ( // provide the IP repair check on service IPs type Controller struct { Config - RangeRegistries - runner *async.Runner -} + client kubernetes.Interface -type RangeRegistries struct { - ServiceClusterIPRegistry rangeallocation.RangeRegistry - SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry - ServiceNodePortRegistry rangeallocation.RangeRegistry + lock sync.Mutex + stopCh chan struct{} // closed by Stop() } type Config struct { - Client kubernetes.Interface - Informers informers.SharedInformerFactory - - KubernetesService - ClusterIP - NodePort -} - -type KubernetesService struct { PublicIP net.IP EndpointReconciler reconcilers.EndpointReconciler @@ -87,32 +66,18 @@ type KubernetesService struct { KubernetesServiceNodePort int } -type ClusterIP struct { - ServiceClusterIPRange net.IPNet - SecondaryServiceClusterIPRange net.IPNet - ServiceClusterIPInterval time.Duration -} - -type NodePort struct { - ServiceNodePortInterval time.Duration - ServiceNodePortRange utilnet.PortRange -} - // New returns a controller for watching the kubernetes service endpoints. -func New(config Config, rangeRegistries RangeRegistries) (*Controller, error) { +func New(config Config, client kubernetes.Interface) *Controller { return &Controller{ - Config: config, - RangeRegistries: rangeRegistries, - }, nil + Config: config, + client: client, + stopCh: make(chan struct{}), + } } // Start begins the core controller loops that must exist for bootstrapping // a cluster. -func (c *Controller) Start() { - if c.runner != nil { - return - } - +func (c *Controller) Start(stopCh <-chan struct{}) { // Reconcile during first run removing itself until server is ready. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https") if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil { @@ -121,72 +86,30 @@ func (c *Controller) Start() { klog.Errorf("Error removing old endpoints from kubernetes service: %v", err) } - repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.Client.CoreV1(), c.Client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry) - - // We start both repairClusterIPs and repairNodePorts to ensure repair - // loops of ClusterIPs and NodePorts. - // We run both repair loops using RunUntil public interface. - // However, we want to fail liveness/readiness until the first - // successful repair loop, so we basically pass appropriate - // callbacks to RunUtil methods. - // Additionally, we ensure that we don't wait for it for longer - // than 1 minute for backward compatibility of failing the whole - // apiserver if we can't repair them. - wg := sync.WaitGroup{} - wg.Add(1) - - runRepairNodePorts := func(stopCh chan struct{}) { - repairNodePorts.RunUntil(wg.Done, stopCh) - } - - wg.Add(1) - var runRepairClusterIPs func(stopCh chan struct{}) - if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { - repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, - c.Client.CoreV1(), - c.Client.EventsV1(), - &c.ServiceClusterIPRange, - c.ServiceClusterIPRegistry, - &c.SecondaryServiceClusterIPRange, - c.SecondaryServiceClusterIPRegistry) - runRepairClusterIPs = func(stopCh chan struct{}) { - repairClusterIPs.RunUntil(wg.Done, stopCh) - } - } else { - repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval, - c.Client, - &c.ServiceClusterIPRange, - &c.SecondaryServiceClusterIPRange, - c.Informers.Core().V1().Services(), - c.Informers.Networking().V1alpha1().IPAddresses(), - ) - runRepairClusterIPs = func(stopCh chan struct{}) { - repairClusterIPs.RunUntil(wg.Done, stopCh) - } - } - c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts) - c.runner.Start() - - // For backward compatibility, we ensure that if we never are able - // to repair clusterIPs and/or nodeports, we not only fail the liveness - // and/or readiness, but also explicitly fail. - done := make(chan struct{}) + localStopCh := make(chan struct{}) go func() { - defer close(done) - wg.Wait() + defer close(localStopCh) + select { + case <-stopCh: // from Start + case <-c.stopCh: // from Stop + } }() - select { - case <-done: - case <-time.After(time.Minute): - klog.Fatalf("Unable to perform initial IP and Port allocation check") - } + + go c.Run(localStopCh) } // Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly. func (c *Controller) Stop() { - if c.runner != nil { - c.runner.Stop() + c.lock.Lock() + defer c.lock.Unlock() + + select { + case <-c.stopCh: + return // only close once + default: + close(c.stopCh) } + endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https") finishedReconciling := make(chan struct{}) go func() { @@ -208,12 +131,12 @@ func (c *Controller) Stop() { } } -// RunKubernetesService periodically updates the kubernetes service -func (c *Controller) RunKubernetesService(ch chan struct{}) { +// Run periodically updates the kubernetes service +func (c *Controller) Run(ch <-chan struct{}) { // wait until process is ready wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { var code int - c.Client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code) + c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code) return code == http.StatusOK, nil }, ch) @@ -233,8 +156,8 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error { // TODO: when it becomes possible to change this stuff, // stop polling and start watching. // TODO: add endpoints of all replicas, not just the elected master. - if _, err := c.Client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil { - if _, err := c.Client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil { + if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: metav1.NamespaceDefault, Namespace: "", @@ -286,12 +209,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1. // CreateOrUpdateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error { - if s, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { + if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { // The service already exists. if reconcile { if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { klog.Warningf("Resetting master service %q to %#v", serviceName, svc) - _, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{}) + _, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{}) return err } } @@ -315,7 +238,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser }, } - _, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if errors.IsAlreadyExists(err) { return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile) } diff --git a/pkg/controlplane/controller/kubernetesservice/controller_test.go b/pkg/controlplane/controller/kubernetesservice/controller_test.go index 5e370e82dcb..2cf20f3a9ef 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller_test.go +++ b/pkg/controlplane/controller/kubernetesservice/controller_test.go @@ -67,7 +67,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { for _, test := range createTests { master := Controller{} fakeClient := fake.NewSimpleClientset() - master.Client = fakeClient + master.client = fakeClient master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) creates := []core.CreateAction{} for _, action := range fakeClient.Actions() { @@ -349,7 +349,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { for _, test := range reconcileTests { master := Controller{} fakeClient := fake.NewSimpleClientset(test.service) - master.Client = fakeClient + master.client = fakeClient err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) @@ -408,7 +408,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { for _, test := range nonReconcileTests { master := Controller{} fakeClient := fake.NewSimpleClientset(test.service) - master.Client = fakeClient + master.client = fakeClient err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 59e530c4444..807ff6d69ce 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -395,12 +395,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo, } - // install legacy rest storage - - if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter); err != nil { - return nil, err - } - clientset, err := kubernetes.NewForConfig(c.GenericConfig.LoopbackClientConfig) if err != nil { return nil, err @@ -409,6 +403,32 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) // TODO: update to a version that caches success but will recheck on failure, unlike memcache discovery discoveryClientForAdmissionRegistration := clientset.Discovery() + legacyRESTStorageProvider, err := corerest.New(corerest.Config{ + GenericConfig: corerest.GenericConfig{ + StorageFactory: c.ExtraConfig.StorageFactory, + EventTTL: c.ExtraConfig.EventTTL, + LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, + ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer, + ExtendExpiration: c.ExtraConfig.ExtendExpiration, + ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, + APIAudiences: c.GenericConfig.Authentication.APIAudiences, + Informers: c.ExtraConfig.VersionedInformers, + }, + Proxy: corerest.ProxyConfig{ + Transport: c.ExtraConfig.ProxyTransport, + KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, + }, + Services: corerest.ServicesConfig{ + ClusterIPRange: c.ExtraConfig.ServiceIPRange, + SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange, + NodePortRange: c.ExtraConfig.ServiceNodePortRange, + IPRepairInterval: c.ExtraConfig.RepairServicesInterval, + }, + }) + if err != nil { + return nil, err + } + // The order here is preserved in discovery. // If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"), // the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer. @@ -417,6 +437,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) // TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery // handlers that we have. restStorageProviders := []RESTStorageProvider{ + legacyRESTStorageProvider, apiserverinternalrest.StorageProvider{}, authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences}, authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver}, @@ -443,12 +464,37 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil, err } + m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error { + go systemnamespaces.NewController(clientset, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh) + return nil + }) + + _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort() + if err != nil { + return nil, fmt.Errorf("failed to get listener address: %w", err) + } + kubernetesServiceCtrl := kubernetesservice.New(kubernetesservice.Config{ + PublicIP: c.GenericConfig.PublicAddress, + + EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler, + EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval, + + ServiceIP: c.ExtraConfig.APIServerServiceIP, + ServicePort: c.ExtraConfig.APIServerServicePort, + PublicServicePort: publicServicePort, + KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort, + }, clientset) + m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error { + kubernetesServiceCtrl.Start(hookContext.StopCh) + return nil + }) + m.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error { + kubernetesServiceCtrl.Stop() + return nil + }) + m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error { - kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) - if err != nil { - return err - } - controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient) + controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, clientset) // generate a context from stopCh. This is to avoid modifying files which are relying on apiserver // TODO: See if we can pass ctx to the current method @@ -572,93 +618,6 @@ func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc { } } -// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled. -func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error { - legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{ - GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{ - StorageFactory: c.ExtraConfig.StorageFactory, - EventTTL: c.ExtraConfig.EventTTL, - LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, - ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer, - ExtendExpiration: c.ExtraConfig.ExtendExpiration, - ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration, - APIAudiences: c.GenericConfig.Authentication.APIAudiences, - Informers: c.ExtraConfig.VersionedInformers, - }, - ProxyTransport: c.ExtraConfig.ProxyTransport, - KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, - ServiceIPRange: c.ExtraConfig.ServiceIPRange, - SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange, - ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange, - } - rangeRegistries, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter) - if err != nil { - return fmt.Errorf("error building core storage: %v", err) - } - if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 { // if all core storage is disabled, return. - return nil - } - - controllerName := "bootstrap-controller" - client := kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) - // Kubernetes clusters contains the following system namespaces: - // kube-system, kube-node-lease, kube-public, default - m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error { - go systemnamespaces.NewController(client, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh) - return nil - }) - - kubenetesserviceConfig, err := c.newKubernetesServiceControllerConfig(client) - if err != nil { - return err - } - bootstrapController, err := kubernetesservice.New(*kubenetesserviceConfig, rangeRegistries) - if err != nil { - return fmt.Errorf("error creating bootstrap controller: %v", err) - } - m.GenericAPIServer.AddPostStartHookOrDie(controllerName, func(genericapiserver.PostStartHookContext) error { bootstrapController.Start(); return nil }) - m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, func() error { bootstrapController.Stop(); return nil }) - - if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { - return fmt.Errorf("error in registering group versions: %v", err) - } - return nil -} - -// newKubernetesServiceControllerConfig returns a configuration for the kubernetes service controller. -func (c completedConfig) newKubernetesServiceControllerConfig(client kubernetes.Interface) (*kubernetesservice.Config, error) { - _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort() - if err != nil { - return nil, fmt.Errorf("failed to get listener address: %w", err) - } - - return &kubernetesservice.Config{ - Client: client, - Informers: c.ExtraConfig.VersionedInformers, - KubernetesService: kubernetesservice.KubernetesService{ - PublicIP: c.GenericConfig.PublicAddress, - - EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler, - EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval, - - ServiceIP: c.ExtraConfig.APIServerServiceIP, - ServicePort: c.ExtraConfig.APIServerServicePort, - PublicServicePort: publicServicePort, - KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort, - }, - ClusterIP: kubernetesservice.ClusterIP{ - ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange, - SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange, - - ServiceClusterIPInterval: c.ExtraConfig.RepairServicesInterval, - }, - NodePort: kubernetesservice.NodePort{ - ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange, - ServiceNodePortInterval: c.ExtraConfig.RepairServicesInterval, - }, - }, nil -} - // RESTStorageProvider is a factory type for REST storage. type RESTStorageProvider interface { GroupName() string @@ -667,7 +626,7 @@ type RESTStorageProvider interface { // InstallAPIs will install the APIs for the restStorageProviders if they are enabled. func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error { - apiGroupsInfo := []*genericapiserver.APIGroupInfo{} + nonLegacy := []*genericapiserver.APIGroupInfo{} // used later in the loop to filter the served resource by those that have expired. resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*m.GenericAPIServer.Version) @@ -707,10 +666,18 @@ func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResource m.GenericAPIServer.AddPostStartHookOrDie(name, hook) } - apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo) + if len(groupName) == 0 { + // the legacy group for core APIs is special that it is installed into /api via this special install method. + if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { + return fmt.Errorf("error in registering legacy API: %w", err) + } + } else { + // everything else goes to /apis + nonLegacy = append(nonLegacy, &apiGroupInfo) + } } - if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil { + if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil { return fmt.Errorf("error in registering group versions: %v", err) } return nil diff --git a/pkg/controlplane/instance_test.go b/pkg/controlplane/instance_test.go index 7c540d5e6e9..9fdb35d5c2e 100644 --- a/pkg/controlplane/instance_test.go +++ b/pkg/controlplane/instance_test.go @@ -152,20 +152,27 @@ func TestLegacyRestStorageStrategies(t *testing.T) { _, etcdserver, apiserverCfg, _ := newInstance(t) defer etcdserver.Terminate(t) - storageProvider := corerest.LegacyRESTStorageProvider{ - GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{ + storageProvider, err := corerest.New(corerest.Config{ + GenericConfig: corerest.GenericConfig{ StorageFactory: apiserverCfg.ExtraConfig.StorageFactory, EventTTL: apiserverCfg.ExtraConfig.EventTTL, LoopbackClientConfig: apiserverCfg.GenericConfig.LoopbackClientConfig, Informers: apiserverCfg.ExtraConfig.VersionedInformers, }, - ProxyTransport: apiserverCfg.ExtraConfig.ProxyTransport, - KubeletClientConfig: apiserverCfg.ExtraConfig.KubeletClientConfig, - ServiceIPRange: apiserverCfg.ExtraConfig.ServiceIPRange, - ServiceNodePortRange: apiserverCfg.ExtraConfig.ServiceNodePortRange, + Proxy: corerest.ProxyConfig{ + Transport: apiserverCfg.ExtraConfig.ProxyTransport, + KubeletClientConfig: apiserverCfg.ExtraConfig.KubeletClientConfig, + }, + Services: corerest.ServicesConfig{ + ClusterIPRange: apiserverCfg.ExtraConfig.ServiceIPRange, + NodePortRange: apiserverCfg.ExtraConfig.ServiceNodePortRange, + }, + }) + if err != nil { + t.Fatalf("unexpected error from REST storage: %v", err) } - _, apiGroupInfo, err := storageProvider.NewLegacyRESTStorage(serverstorage.NewResourceConfig(), apiserverCfg.GenericConfig.RESTOptionsGetter) + apiGroupInfo, err := storageProvider.NewRESTStorage(serverstorage.NewResourceConfig(), apiserverCfg.GenericConfig.RESTOptionsGetter) if err != nil { t.Errorf("failed to create legacy REST storage: %v", err) } diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index c004aaf0408..5b446dedec5 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -18,9 +18,11 @@ package rest import ( "crypto/tls" + goerrors "errors" "fmt" "net" "net/http" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -33,13 +35,14 @@ import ( serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" restclient "k8s.io/client-go/rest" + "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/cluster/ports" - "k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice" "k8s.io/kubernetes/pkg/features" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/registry/core/componentstatus" @@ -53,23 +56,26 @@ import ( pvcstore "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage" podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage" podtemplatestore "k8s.io/kubernetes/pkg/registry/core/podtemplate/storage" + "k8s.io/kubernetes/pkg/registry/core/rangeallocation" controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage" resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage" secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage" "k8s.io/kubernetes/pkg/registry/core/service/allocator" serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + serviceipallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" + portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller" servicestore "k8s.io/kubernetes/pkg/registry/core/service/storage" serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/serviceaccount" + "k8s.io/kubernetes/pkg/util/async" ) -// GenericLegacyRESTStorageProvider provides information needed to build RESTStorage -// for generic resources in core, but does NOT implement the "normal" -// RESTStorageProvider (yet!) -type GenericLegacyRESTStorageProvider struct { +// GenericConfig provides information needed to build RESTStorage +// for generic resources in core. It implements the "normal" RESTStorageProvider interface. +type GenericConfig struct { StorageFactory serverstorage.StorageFactory EventTTL time.Duration @@ -83,24 +89,91 @@ type GenericLegacyRESTStorageProvider struct { Informers informers.SharedInformerFactory } -// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but -// does NOT implement the "normal" RESTStorageProvider (yet!) -type LegacyRESTStorageProvider struct { - GenericLegacyRESTStorageProvider +// Config provides information needed to build RESTStorage for core. +type Config struct { + GenericConfig - // Used for custom proxy dialing, and proxy TLS options - ProxyTransport http.RoundTripper - KubeletClientConfig kubeletclient.KubeletClientConfig - - // ServiceIPRange is used to build cluster IPs for discovery. - ServiceIPRange net.IPNet - - // allocates ips for secondary service cidr in dual stack clusters - SecondaryServiceIPRange net.IPNet - ServiceNodePortRange utilnet.PortRange + Proxy ProxyConfig + Services ServicesConfig } -func (c GenericLegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) { +type ProxyConfig struct { + Transport http.RoundTripper + KubeletClientConfig kubeletclient.KubeletClientConfig +} + +type ServicesConfig struct { + // Service IP ranges + ClusterIPRange net.IPNet + SecondaryClusterIPRange net.IPNet + NodePortRange utilnet.PortRange + + IPRepairInterval time.Duration +} + +type rangeRegistries struct { + clusterIP rangeallocation.RangeRegistry + secondaryClusterIP rangeallocation.RangeRegistry + nodePort rangeallocation.RangeRegistry +} + +type legacyProvider struct { + Config + + primaryServiceClusterIPAllocator ipallocator.Interface + serviceClusterIPAllocators map[api.IPFamily]ipallocator.Interface + serviceNodePortAllocator *portallocator.PortAllocator + + startServiceNodePortsRepair, startServiceClusterIPRepair func(onFirstSuccess func(), stopCh chan struct{}) +} + +func New(c Config) (*legacyProvider, error) { + rangeRegistries, serviceClusterIPAllocator, serviceIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators() + if err != nil { + return nil, err + } + + p := &legacyProvider{ + Config: c, + + primaryServiceClusterIPAllocator: serviceClusterIPAllocator, + serviceClusterIPAllocators: serviceIPAllocators, + serviceNodePortAllocator: serviceNodePortAllocator, + } + + // create service node port repair controller + client, err := kubernetes.NewForConfig(c.LoopbackClientConfig) + if err != nil { + return nil, err + } + p.startServiceNodePortsRepair = portallocatorcontroller.NewRepair(c.Services.IPRepairInterval, client.CoreV1(), client.EventsV1(), c.Services.NodePortRange, rangeRegistries.nodePort).RunUntil + + // create service cluster ip repair controller + if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { + p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepair( + c.Services.IPRepairInterval, + client.CoreV1(), + client.EventsV1(), + &c.Services.ClusterIPRange, + rangeRegistries.clusterIP, + &c.Services.SecondaryClusterIPRange, + rangeRegistries.secondaryClusterIP, + ).RunUntil + } else { + p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress( + c.Services.IPRepairInterval, + client, + &c.Services.ClusterIPRange, + &c.Services.SecondaryClusterIPRange, + c.Informers.Core().V1().Services(), + c.Informers.Networking().V1alpha1().IPAddresses(), + ).RunUntil + } + + return p, nil +} + +func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) { apiGroupInfo := genericapiserver.APIGroupInfo{ PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""), VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, @@ -180,70 +253,71 @@ func (c GenericLegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource return apiGroupInfo, nil } -func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (kubernetesservice.RangeRegistries, genericapiserver.APIGroupInfo, error) { - apiGroupInfo, err := c.GenericLegacyRESTStorageProvider.NewRESTStorage(apiResourceConfigSource, restOptionsGetter) + +func (c *GenericConfig) GroupName() string { + return api.GroupName +} + +func (c *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) { + apiGroupInfo, err := c.GenericConfig.NewRESTStorage(apiResourceConfigSource, restOptionsGetter) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } podDisruptionClient, err := policyclient.NewForConfig(c.LoopbackClientConfig) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } - nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport) + nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.Proxy.KubeletClientConfig, c.Proxy.Transport) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } podStorage, err := podstore.NewStorage( restOptionsGetter, nodeStorage.KubeletConnectionInfo, - c.ProxyTransport, + c.Proxy.Transport, podDisruptionClient, ) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } - rangeRegistries, primaryServiceClusterIPAllocator, serviceClusterIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators() - if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err - } serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST( restOptionsGetter, - primaryServiceClusterIPAllocator.IPFamily(), - serviceClusterIPAllocators, - serviceNodePortAllocator, + c.primaryServiceClusterIPAllocator.IPFamily(), + c.serviceClusterIPAllocators, + c.serviceNodePortAllocator, endpointsStorage, podStorage.Pod, - c.ProxyTransport) + c.Proxy.Transport) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } storage := apiGroupInfo.VersionedResourcesStorageMap["v1"] @@ -256,7 +330,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource if c.ServiceAccountIssuer != nil { serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, storage["secrets"].(rest.Getter), c.ExtendExpiration) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } } @@ -285,7 +359,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) { controllerStorage, err := controllerstore.NewStorage(restOptionsGetter) if err != nil { - return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err + return genericapiserver.APIGroupInfo{}, err } storage[resource] = controllerStorage.Controller @@ -347,20 +421,20 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage } - return rangeRegistries, apiGroupInfo, nil + return apiGroupInfo, nil } -func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernetesservice.RangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) { +func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) { clusterIPAllocators = map[api.IPFamily]ipallocator.Interface{} serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services")) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, err + return rangeRegistries{}, nil, nil, nil, err } - serviceClusterIPRange := c.ServiceIPRange + serviceClusterIPRange := c.Services.ClusterIPRange if serviceClusterIPRange.IP == nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing") + return rangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing") } if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { @@ -372,30 +446,30 @@ func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernet if err != nil { return nil, err } - registries.ServiceClusterIPRegistry = etcd + registries.clusterIP = etcd return etcd, nil }) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err) + return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err) } } else { networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, err + return rangeRegistries{}, nil, nil, nil, err } primaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err) + return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err) } } primaryClusterIPAllocator.EnableMetrics() clusterIPAllocators[primaryClusterIPAllocator.IPFamily()] = primaryClusterIPAllocator var secondaryClusterIPAllocator ipallocator.Interface - if c.SecondaryServiceIPRange.IP != nil { + if c.Services.SecondaryClusterIPRange.IP != nil { if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { var err error - secondaryClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + secondaryClusterIPAllocator, err = ipallocator.New(&c.Services.SecondaryClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { var mem allocator.Snapshottable mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) // TODO etcdallocator package to return a storage interface via the storageFactory @@ -403,45 +477,88 @@ func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernet if err != nil { return nil, err } - registries.SecondaryServiceClusterIPRegistry = etcd + registries.secondaryClusterIP = etcd return etcd, nil }) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) + return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) } } else { networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, err + return rangeRegistries{}, nil, nil, nil, err } - secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) + secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.Services.SecondaryClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) + return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) } } secondaryClusterIPAllocator.EnableMetrics() clusterIPAllocators[secondaryClusterIPAllocator.IPFamily()] = secondaryClusterIPAllocator } - nodePortAllocator, err = portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { + nodePortAllocator, err = portallocator.New(c.Services.NodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) { mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset) // TODO etcdallocator package to return a storage interface via the storageFactory etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations"))) if err != nil { return nil, err } - registries.ServiceNodePortRegistry = etcd + registries.nodePort = etcd return etcd, nil }) if err != nil { - return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err) + return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err) } nodePortAllocator.EnableMetrics() return } -func (p LegacyRESTStorageProvider) GroupName() string { +var _ genericapiserver.PostStartHookProvider = &legacyProvider{} + +func (p *legacyProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) { + return "start-service-ip-repair-controllers", func(context genericapiserver.PostStartHookContext) error { + // We start both repairClusterIPs and repairNodePorts to ensure repair + // loops of ClusterIPs and NodePorts. + // We run both repair loops using RunUntil public interface. + // However, we want to fail liveness/readiness until the first + // successful repair loop, so we basically pass appropriate + // callbacks to RunUtil methods. + // Additionally, we ensure that we don't wait for it for longer + // than 1 minute for backward compatibility of failing the whole + // apiserver if we can't repair them. + wg := sync.WaitGroup{} + wg.Add(2) + runner := async.NewRunner( + func(stopCh chan struct{}) { p.startServiceClusterIPRepair(wg.Done, stopCh) }, + func(stopCh chan struct{}) { p.startServiceNodePortsRepair(wg.Done, stopCh) }, + ) + runner.Start() + go func() { + defer runner.Stop() + <-context.StopCh + }() + + // For backward compatibility, we ensure that if we never are able + // to repair clusterIPs and/or nodeports, we not only fail the liveness + // and/or readiness, but also explicitly fail. + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + select { + case <-done: + case <-time.After(time.Minute): + return goerrors.New("unable to perform initial IP and Port allocation check") + } + + return nil + }, nil +} + +func (p *legacyProvider) GroupName() string { return api.GroupName } diff --git a/test/e2e/apimachinery/health_handlers.go b/test/e2e/apimachinery/health_handlers.go index 22cdbb72d9f..2c6190c0c86 100644 --- a/test/e2e/apimachinery/health_handlers.go +++ b/test/e2e/apimachinery/health_handlers.go @@ -44,6 +44,7 @@ var ( "[+]poststarthook/crd-informer-synced ok", "[+]poststarthook/bootstrap-controller ok", "[+]poststarthook/start-system-namespaces-controller ok", + "[+]poststarthook/start-service-ip-repair-controllers ok", "[+]poststarthook/scheduling/bootstrap-system-priority-classes ok", "[+]poststarthook/start-cluster-authentication-info-controller ok", "[+]poststarthook/start-kube-aggregator-informers ok", @@ -64,6 +65,7 @@ var ( "[+]poststarthook/crd-informer-synced ok", "[+]poststarthook/bootstrap-controller ok", "[+]poststarthook/start-system-namespaces-controller ok", + "[+]poststarthook/start-service-ip-repair-controllers ok", "[+]poststarthook/scheduling/bootstrap-system-priority-classes ok", "[+]poststarthook/start-cluster-authentication-info-controller ok", "[+]poststarthook/start-kube-aggregator-informers ok", @@ -85,6 +87,7 @@ var ( "[+]poststarthook/crd-informer-synced ok", "[+]poststarthook/bootstrap-controller ok", "[+]poststarthook/start-system-namespaces-controller ok", + "[+]poststarthook/start-service-ip-repair-controllers ok", "[+]poststarthook/scheduling/bootstrap-system-priority-classes ok", "[+]poststarthook/start-cluster-authentication-info-controller ok", "[+]poststarthook/start-kube-aggregator-informers ok",