Merge pull request #119042 from sttts/sttts-restcore-split

cmd/kube-apiserver: turn core (legacy) rest storage into standard RESTStorageProvider
This commit is contained in:
Kubernetes Prow Robot 2023-07-12 03:35:17 -07:00 committed by GitHub
commit a8093823c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 303 additions and 286 deletions

View File

@ -28,21 +28,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
"k8s.io/kubernetes/pkg/util/async"
)
const (
@ -54,27 +46,14 @@ const (
// provide the IP repair check on service IPs
type Controller struct {
Config
RangeRegistries
runner *async.Runner
}
client kubernetes.Interface
type RangeRegistries struct {
ServiceClusterIPRegistry rangeallocation.RangeRegistry
SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceNodePortRegistry rangeallocation.RangeRegistry
lock sync.Mutex
stopCh chan struct{} // closed by Stop()
}
type Config struct {
Client kubernetes.Interface
Informers informers.SharedInformerFactory
KubernetesService
ClusterIP
NodePort
}
type KubernetesService struct {
PublicIP net.IP
EndpointReconciler reconcilers.EndpointReconciler
@ -87,32 +66,18 @@ type KubernetesService struct {
KubernetesServiceNodePort int
}
type ClusterIP struct {
ServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRange net.IPNet
ServiceClusterIPInterval time.Duration
}
type NodePort struct {
ServiceNodePortInterval time.Duration
ServiceNodePortRange utilnet.PortRange
}
// New returns a controller for watching the kubernetes service endpoints.
func New(config Config, rangeRegistries RangeRegistries) (*Controller, error) {
func New(config Config, client kubernetes.Interface) *Controller {
return &Controller{
Config: config,
RangeRegistries: rangeRegistries,
}, nil
Config: config,
client: client,
stopCh: make(chan struct{}),
}
}
// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start() {
if c.runner != nil {
return
}
func (c *Controller) Start(stopCh <-chan struct{}) {
// Reconcile during first run removing itself until server is ready.
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
@ -121,72 +86,30 @@ func (c *Controller) Start() {
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
}
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.Client.CoreV1(), c.Client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
// We run both repair loops using RunUntil public interface.
// However, we want to fail liveness/readiness until the first
// successful repair loop, so we basically pass appropriate
// callbacks to RunUtil methods.
// Additionally, we ensure that we don't wait for it for longer
// than 1 minute for backward compatibility of failing the whole
// apiserver if we can't repair them.
wg := sync.WaitGroup{}
wg.Add(1)
runRepairNodePorts := func(stopCh chan struct{}) {
repairNodePorts.RunUntil(wg.Done, stopCh)
}
wg.Add(1)
var runRepairClusterIPs func(stopCh chan struct{})
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval,
c.Client.CoreV1(),
c.Client.EventsV1(),
&c.ServiceClusterIPRange,
c.ServiceClusterIPRegistry,
&c.SecondaryServiceClusterIPRange,
c.SecondaryServiceClusterIPRegistry)
runRepairClusterIPs = func(stopCh chan struct{}) {
repairClusterIPs.RunUntil(wg.Done, stopCh)
}
} else {
repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval,
c.Client,
&c.ServiceClusterIPRange,
&c.SecondaryServiceClusterIPRange,
c.Informers.Core().V1().Services(),
c.Informers.Networking().V1alpha1().IPAddresses(),
)
runRepairClusterIPs = func(stopCh chan struct{}) {
repairClusterIPs.RunUntil(wg.Done, stopCh)
}
}
c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
c.runner.Start()
// For backward compatibility, we ensure that if we never are able
// to repair clusterIPs and/or nodeports, we not only fail the liveness
// and/or readiness, but also explicitly fail.
done := make(chan struct{})
localStopCh := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
defer close(localStopCh)
select {
case <-stopCh: // from Start
case <-c.stopCh: // from Stop
}
}()
select {
case <-done:
case <-time.After(time.Minute):
klog.Fatalf("Unable to perform initial IP and Port allocation check")
}
go c.Run(localStopCh)
}
// Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
func (c *Controller) Stop() {
if c.runner != nil {
c.runner.Stop()
c.lock.Lock()
defer c.lock.Unlock()
select {
case <-c.stopCh:
return // only close once
default:
close(c.stopCh)
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
finishedReconciling := make(chan struct{})
go func() {
@ -208,12 +131,12 @@ func (c *Controller) Stop() {
}
}
// RunKubernetesService periodically updates the kubernetes service
func (c *Controller) RunKubernetesService(ch chan struct{}) {
// Run periodically updates the kubernetes service
func (c *Controller) Run(ch <-chan struct{}) {
// wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.Client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, ch)
@ -233,8 +156,8 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if _, err := c.Client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
if _, err := c.Client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: metav1.NamespaceDefault,
Namespace: "",
@ -286,12 +209,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
// The service already exists.
if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
return err
}
}
@ -315,7 +238,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
},
}
_, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) {
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
}

View File

@ -67,7 +67,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range createTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
master.Client = fakeClient
master.client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
@ -349,7 +349,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range reconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.Client = fakeClient
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
@ -408,7 +408,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
for _, test := range nonReconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.Client = fakeClient
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)

View File

@ -395,12 +395,6 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
}
// install legacy rest storage
if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter); err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(c.GenericConfig.LoopbackClientConfig)
if err != nil {
return nil, err
@ -409,6 +403,32 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// TODO: update to a version that caches success but will recheck on failure, unlike memcache discovery
discoveryClientForAdmissionRegistration := clientset.Discovery()
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: corerest.GenericConfig{
StorageFactory: c.ExtraConfig.StorageFactory,
EventTTL: c.ExtraConfig.EventTTL,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
Informers: c.ExtraConfig.VersionedInformers,
},
Proxy: corerest.ProxyConfig{
Transport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
},
Services: corerest.ServicesConfig{
ClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
NodePortRange: c.ExtraConfig.ServiceNodePortRange,
IPRepairInterval: c.ExtraConfig.RepairServicesInterval,
},
})
if err != nil {
return nil, err
}
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
@ -417,6 +437,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
restStorageProviders := []RESTStorageProvider{
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
@ -443,12 +464,37 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, err
}
m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(clientset, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
return nil
})
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
kubernetesServiceCtrl := kubernetesservice.New(kubernetesservice.Config{
PublicIP: c.GenericConfig.PublicAddress,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
}, clientset)
m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
kubernetesServiceCtrl.Stop()
return nil
})
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return err
}
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, clientset)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
@ -572,93 +618,6 @@ func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc {
}
}
// InstallLegacyAPI will install the legacy APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
EventTTL: c.ExtraConfig.EventTTL,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
Informers: c.ExtraConfig.VersionedInformers,
},
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
}
rangeRegistries, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("error building core storage: %v", err)
}
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 { // if all core storage is disabled, return.
return nil
}
controllerName := "bootstrap-controller"
client := kubernetes.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
// Kubernetes clusters contains the following system namespaces:
// kube-system, kube-node-lease, kube-public, default
m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(client, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
return nil
})
kubenetesserviceConfig, err := c.newKubernetesServiceControllerConfig(client)
if err != nil {
return err
}
bootstrapController, err := kubernetesservice.New(*kubenetesserviceConfig, rangeRegistries)
if err != nil {
return fmt.Errorf("error creating bootstrap controller: %v", err)
}
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, func(genericapiserver.PostStartHookContext) error { bootstrapController.Start(); return nil })
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, func() error { bootstrapController.Stop(); return nil })
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
}
// newKubernetesServiceControllerConfig returns a configuration for the kubernetes service controller.
func (c completedConfig) newKubernetesServiceControllerConfig(client kubernetes.Interface) (*kubernetesservice.Config, error) {
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
return &kubernetesservice.Config{
Client: client,
Informers: c.ExtraConfig.VersionedInformers,
KubernetesService: kubernetesservice.KubernetesService{
PublicIP: c.GenericConfig.PublicAddress,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
},
ClusterIP: kubernetesservice.ClusterIP{
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceClusterIPInterval: c.ExtraConfig.RepairServicesInterval,
},
NodePort: kubernetesservice.NodePort{
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
ServiceNodePortInterval: c.ExtraConfig.RepairServicesInterval,
},
}, nil
}
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
@ -667,7 +626,7 @@ type RESTStorageProvider interface {
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
apiGroupsInfo := []*genericapiserver.APIGroupInfo{}
nonLegacy := []*genericapiserver.APIGroupInfo{}
// used later in the loop to filter the served resource by those that have expired.
resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*m.GenericAPIServer.Version)
@ -707,10 +666,18 @@ func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResource
m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
}
apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}
if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil

View File

@ -152,20 +152,27 @@ func TestLegacyRestStorageStrategies(t *testing.T) {
_, etcdserver, apiserverCfg, _ := newInstance(t)
defer etcdserver.Terminate(t)
storageProvider := corerest.LegacyRESTStorageProvider{
GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{
storageProvider, err := corerest.New(corerest.Config{
GenericConfig: corerest.GenericConfig{
StorageFactory: apiserverCfg.ExtraConfig.StorageFactory,
EventTTL: apiserverCfg.ExtraConfig.EventTTL,
LoopbackClientConfig: apiserverCfg.GenericConfig.LoopbackClientConfig,
Informers: apiserverCfg.ExtraConfig.VersionedInformers,
},
ProxyTransport: apiserverCfg.ExtraConfig.ProxyTransport,
KubeletClientConfig: apiserverCfg.ExtraConfig.KubeletClientConfig,
ServiceIPRange: apiserverCfg.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: apiserverCfg.ExtraConfig.ServiceNodePortRange,
Proxy: corerest.ProxyConfig{
Transport: apiserverCfg.ExtraConfig.ProxyTransport,
KubeletClientConfig: apiserverCfg.ExtraConfig.KubeletClientConfig,
},
Services: corerest.ServicesConfig{
ClusterIPRange: apiserverCfg.ExtraConfig.ServiceIPRange,
NodePortRange: apiserverCfg.ExtraConfig.ServiceNodePortRange,
},
})
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
_, apiGroupInfo, err := storageProvider.NewLegacyRESTStorage(serverstorage.NewResourceConfig(), apiserverCfg.GenericConfig.RESTOptionsGetter)
apiGroupInfo, err := storageProvider.NewRESTStorage(serverstorage.NewResourceConfig(), apiserverCfg.GenericConfig.RESTOptionsGetter)
if err != nil {
t.Errorf("failed to create legacy REST storage: %v", err)
}

View File

@ -18,9 +18,11 @@ package rest
import (
"crypto/tls"
goerrors "errors"
"fmt"
"net"
"net/http"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -33,13 +35,14 @@ import (
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice"
"k8s.io/kubernetes/pkg/features"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/componentstatus"
@ -53,23 +56,26 @@ import (
pvcstore "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/storage"
podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage"
podtemplatestore "k8s.io/kubernetes/pkg/registry/core/podtemplate/storage"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
controllerstore "k8s.io/kubernetes/pkg/registry/core/replicationcontroller/storage"
resourcequotastore "k8s.io/kubernetes/pkg/registry/core/resourcequota/storage"
secretstore "k8s.io/kubernetes/pkg/registry/core/secret/storage"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
serviceipallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
servicestore "k8s.io/kubernetes/pkg/registry/core/service/storage"
serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/async"
)
// GenericLegacyRESTStorageProvider provides information needed to build RESTStorage
// for generic resources in core, but does NOT implement the "normal"
// RESTStorageProvider (yet!)
type GenericLegacyRESTStorageProvider struct {
// GenericConfig provides information needed to build RESTStorage
// for generic resources in core. It implements the "normal" RESTStorageProvider interface.
type GenericConfig struct {
StorageFactory serverstorage.StorageFactory
EventTTL time.Duration
@ -83,24 +89,91 @@ type GenericLegacyRESTStorageProvider struct {
Informers informers.SharedInformerFactory
}
// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
// does NOT implement the "normal" RESTStorageProvider (yet!)
type LegacyRESTStorageProvider struct {
GenericLegacyRESTStorageProvider
// Config provides information needed to build RESTStorage for core.
type Config struct {
GenericConfig
// Used for custom proxy dialing, and proxy TLS options
ProxyTransport http.RoundTripper
KubeletClientConfig kubeletclient.KubeletClientConfig
// ServiceIPRange is used to build cluster IPs for discovery.
ServiceIPRange net.IPNet
// allocates ips for secondary service cidr in dual stack clusters
SecondaryServiceIPRange net.IPNet
ServiceNodePortRange utilnet.PortRange
Proxy ProxyConfig
Services ServicesConfig
}
func (c GenericLegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
type ProxyConfig struct {
Transport http.RoundTripper
KubeletClientConfig kubeletclient.KubeletClientConfig
}
type ServicesConfig struct {
// Service IP ranges
ClusterIPRange net.IPNet
SecondaryClusterIPRange net.IPNet
NodePortRange utilnet.PortRange
IPRepairInterval time.Duration
}
type rangeRegistries struct {
clusterIP rangeallocation.RangeRegistry
secondaryClusterIP rangeallocation.RangeRegistry
nodePort rangeallocation.RangeRegistry
}
type legacyProvider struct {
Config
primaryServiceClusterIPAllocator ipallocator.Interface
serviceClusterIPAllocators map[api.IPFamily]ipallocator.Interface
serviceNodePortAllocator *portallocator.PortAllocator
startServiceNodePortsRepair, startServiceClusterIPRepair func(onFirstSuccess func(), stopCh chan struct{})
}
func New(c Config) (*legacyProvider, error) {
rangeRegistries, serviceClusterIPAllocator, serviceIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators()
if err != nil {
return nil, err
}
p := &legacyProvider{
Config: c,
primaryServiceClusterIPAllocator: serviceClusterIPAllocator,
serviceClusterIPAllocators: serviceIPAllocators,
serviceNodePortAllocator: serviceNodePortAllocator,
}
// create service node port repair controller
client, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return nil, err
}
p.startServiceNodePortsRepair = portallocatorcontroller.NewRepair(c.Services.IPRepairInterval, client.CoreV1(), client.EventsV1(), c.Services.NodePortRange, rangeRegistries.nodePort).RunUntil
// create service cluster ip repair controller
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepair(
c.Services.IPRepairInterval,
client.CoreV1(),
client.EventsV1(),
&c.Services.ClusterIPRange,
rangeRegistries.clusterIP,
&c.Services.SecondaryClusterIPRange,
rangeRegistries.secondaryClusterIP,
).RunUntil
} else {
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress(
c.Services.IPRepairInterval,
client,
&c.Services.ClusterIPRange,
&c.Services.SecondaryClusterIPRange,
c.Informers.Core().V1().Services(),
c.Informers.Networking().V1alpha1().IPAddresses(),
).RunUntil
}
return p, nil
}
func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
@ -180,70 +253,71 @@ func (c GenericLegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource
return apiGroupInfo, nil
}
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (kubernetesservice.RangeRegistries, genericapiserver.APIGroupInfo, error) {
apiGroupInfo, err := c.GenericLegacyRESTStorageProvider.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
func (c *GenericConfig) GroupName() string {
return api.GroupName
}
func (c *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
apiGroupInfo, err := c.GenericConfig.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
podDisruptionClient, err := policyclient.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.Proxy.KubeletClientConfig, c.Proxy.Transport)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
podStorage, err := podstore.NewStorage(
restOptionsGetter,
nodeStorage.KubeletConnectionInfo,
c.ProxyTransport,
c.Proxy.Transport,
podDisruptionClient,
)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
rangeRegistries, primaryServiceClusterIPAllocator, serviceClusterIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators()
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
}
serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST(
restOptionsGetter,
primaryServiceClusterIPAllocator.IPFamily(),
serviceClusterIPAllocators,
serviceNodePortAllocator,
c.primaryServiceClusterIPAllocator.IPFamily(),
c.serviceClusterIPAllocators,
c.serviceNodePortAllocator,
endpointsStorage,
podStorage.Pod,
c.ProxyTransport)
c.Proxy.Transport)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
storage := apiGroupInfo.VersionedResourcesStorageMap["v1"]
@ -256,7 +330,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
if c.ServiceAccountIssuer != nil {
serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, storage["secrets"].(rest.Getter), c.ExtendExpiration)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
}
@ -285,7 +359,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
if err != nil {
return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
return genericapiserver.APIGroupInfo{}, err
}
storage[resource] = controllerStorage.Controller
@ -347,20 +421,20 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
}
return rangeRegistries, apiGroupInfo, nil
return apiGroupInfo, nil
}
func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernetesservice.RangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) {
func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) {
clusterIPAllocators = map[api.IPFamily]ipallocator.Interface{}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, err
return rangeRegistries{}, nil, nil, nil, err
}
serviceClusterIPRange := c.ServiceIPRange
serviceClusterIPRange := c.Services.ClusterIPRange
if serviceClusterIPRange.IP == nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing")
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("service clusterIPRange is missing")
}
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
@ -372,30 +446,30 @@ func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernet
if err != nil {
return nil, err
}
registries.ServiceClusterIPRegistry = etcd
registries.clusterIP = etcd
return etcd, nil
})
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
} else {
networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, err
return rangeRegistries{}, nil, nil, nil, err
}
primaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster IP allocator: %v", err)
}
}
primaryClusterIPAllocator.EnableMetrics()
clusterIPAllocators[primaryClusterIPAllocator.IPFamily()] = primaryClusterIPAllocator
var secondaryClusterIPAllocator ipallocator.Interface
if c.SecondaryServiceIPRange.IP != nil {
if c.Services.SecondaryClusterIPRange.IP != nil {
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
var err error
secondaryClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
secondaryClusterIPAllocator, err = ipallocator.New(&c.Services.SecondaryClusterIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
var mem allocator.Snapshottable
mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
@ -403,45 +477,88 @@ func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries kubernet
if err != nil {
return nil, err
}
registries.SecondaryServiceClusterIPRegistry = etcd
registries.secondaryClusterIP = etcd
return etcd, nil
})
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
}
} else {
networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, err
return rangeRegistries{}, nil, nil, nil, err
}
secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.Services.SecondaryClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
}
}
secondaryClusterIPAllocator.EnableMetrics()
clusterIPAllocators[secondaryClusterIPAllocator.IPFamily()] = secondaryClusterIPAllocator
}
nodePortAllocator, err = portallocator.New(c.ServiceNodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
nodePortAllocator, err = portallocator.New(c.Services.NodePortRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
mem := allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
etcd, err := serviceallocator.NewEtcd(mem, "/ranges/servicenodeports", serviceStorageConfig.ForResource(api.Resource("servicenodeportallocations")))
if err != nil {
return nil, err
}
registries.ServiceNodePortRegistry = etcd
registries.nodePort = etcd
return etcd, nil
})
if err != nil {
return kubernetesservice.RangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err)
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster port allocator: %v", err)
}
nodePortAllocator.EnableMetrics()
return
}
func (p LegacyRESTStorageProvider) GroupName() string {
var _ genericapiserver.PostStartHookProvider = &legacyProvider{}
func (p *legacyProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return "start-service-ip-repair-controllers", func(context genericapiserver.PostStartHookContext) error {
// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
// We run both repair loops using RunUntil public interface.
// However, we want to fail liveness/readiness until the first
// successful repair loop, so we basically pass appropriate
// callbacks to RunUtil methods.
// Additionally, we ensure that we don't wait for it for longer
// than 1 minute for backward compatibility of failing the whole
// apiserver if we can't repair them.
wg := sync.WaitGroup{}
wg.Add(2)
runner := async.NewRunner(
func(stopCh chan struct{}) { p.startServiceClusterIPRepair(wg.Done, stopCh) },
func(stopCh chan struct{}) { p.startServiceNodePortsRepair(wg.Done, stopCh) },
)
runner.Start()
go func() {
defer runner.Stop()
<-context.StopCh
}()
// For backward compatibility, we ensure that if we never are able
// to repair clusterIPs and/or nodeports, we not only fail the liveness
// and/or readiness, but also explicitly fail.
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
case <-time.After(time.Minute):
return goerrors.New("unable to perform initial IP and Port allocation check")
}
return nil
}, nil
}
func (p *legacyProvider) GroupName() string {
return api.GroupName
}

View File

@ -44,6 +44,7 @@ var (
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/start-service-ip-repair-controllers ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
@ -64,6 +65,7 @@ var (
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/start-service-ip-repair-controllers ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
@ -85,6 +87,7 @@ var (
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/start-service-ip-repair-controllers ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",