kube-apiserver: rewire service controllers: kubernetesservice + IP repair

This commit is contained in:
Dr. Stefan Schimanski 2023-07-04 12:58:21 +02:00
parent c733c57962
commit 75e3576523
No known key found for this signature in database
GPG Key ID: 4C68E0F19F95EC33
5 changed files with 212 additions and 80 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net"
"net/http"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -47,6 +48,9 @@ type Controller struct {
Config
client kubernetes.Interface
lock sync.Mutex
stopCh chan struct{} // closed by Stop()
}
type Config struct {
@ -67,6 +71,7 @@ func New(config Config, client kubernetes.Interface) *Controller {
return &Controller{
Config: config,
client: client,
stopCh: make(chan struct{}),
}
}
@ -81,11 +86,30 @@ func (c *Controller) Start(stopCh <-chan struct{}) {
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
}
go c.Run(stopCh)
localStopCh := make(chan struct{})
go func() {
defer close(localStopCh)
select {
case <-stopCh: // from Start
case <-c.stopCh: // from Stop
}
}()
go c.Run(localStopCh)
}
// Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
func (c *Controller) Stop() {
c.lock.Lock()
defer c.lock.Unlock()
select {
case <-c.stopCh:
return // only close once
default:
close(c.stopCh)
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
finishedReconciling := make(chan struct{})
go func() {

View File

@ -403,6 +403,28 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// TODO: update to a version that caches success but will recheck on failure, unlike memcache discovery
discoveryClientForAdmissionRegistration := clientset.Discovery()
legacyRESTStorageProvider, err := corerest.New(corerest.Config{
GenericConfig: corerest.GenericConfig{
StorageFactory: c.ExtraConfig.StorageFactory,
EventTTL: c.ExtraConfig.EventTTL,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
Informers: c.ExtraConfig.VersionedInformers,
},
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
ServiceSecondaryIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
ServiceIPRepairInterval: c.ExtraConfig.RepairServicesInterval,
})
if err != nil {
return nil, err
}
// The order here is preserved in discovery.
// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
@ -411,23 +433,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
restStorageProviders := []RESTStorageProvider{
corerest.LegacyRESTStorageProvider{
GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{
StorageFactory: c.ExtraConfig.StorageFactory,
EventTTL: c.ExtraConfig.EventTTL,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer,
ExtendExpiration: c.ExtraConfig.ExtendExpiration,
ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
APIAudiences: c.GenericConfig.Authentication.APIAudiences,
Informers: c.ExtraConfig.VersionedInformers,
},
ProxyTransport: c.ExtraConfig.ProxyTransport,
KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
ServiceIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
},
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
@ -459,6 +465,30 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
kubernetesServiceCtrl := kubernetesservice.New(kubernetesservice.Config{
PublicIP: c.GenericConfig.PublicAddress,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
}, clientset)
m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
kubernetesServiceCtrl.Stop()
return nil
})
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, clientset)
@ -584,40 +614,6 @@ func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc {
}
}
// newKubernetesServiceControllerConfig returns a configuration for the kubernetes service controller.
func (c completedConfig) newKubernetesServiceControllerConfig(client kubernetes.Interface) (*kubernetesservice.Config, error) {
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
return &kubernetesservice.Config{
Client: client,
Informers: c.ExtraConfig.VersionedInformers,
KubernetesService: kubernetesservice.KubernetesService{
PublicIP: c.GenericConfig.PublicAddress,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
},
ClusterIP: kubernetesservice.ClusterIP{
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceClusterIPInterval: c.ExtraConfig.RepairServicesInterval,
},
NodePort: kubernetesservice.NodePort{
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
ServiceNodePortInterval: c.ExtraConfig.RepairServicesInterval,
},
}, nil
}
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
@ -667,10 +663,12 @@ func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResource
}
if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}

View File

@ -152,8 +152,8 @@ func TestLegacyRestStorageStrategies(t *testing.T) {
_, etcdserver, apiserverCfg, _ := newInstance(t)
defer etcdserver.Terminate(t)
storageProvider := corerest.LegacyRESTStorageProvider{
GenericLegacyRESTStorageProvider: corerest.GenericLegacyRESTStorageProvider{
storageProvider, err := corerest.New(corerest.Config{
GenericConfig: corerest.GenericConfig{
StorageFactory: apiserverCfg.ExtraConfig.StorageFactory,
EventTTL: apiserverCfg.ExtraConfig.EventTTL,
LoopbackClientConfig: apiserverCfg.GenericConfig.LoopbackClientConfig,
@ -163,6 +163,9 @@ func TestLegacyRestStorageStrategies(t *testing.T) {
KubeletClientConfig: apiserverCfg.ExtraConfig.KubeletClientConfig,
ServiceIPRange: apiserverCfg.ExtraConfig.ServiceIPRange,
ServiceNodePortRange: apiserverCfg.ExtraConfig.ServiceNodePortRange,
})
if err != nil {
t.Fatalf("unexpected error from REST storage: %v", err)
}
apiGroupInfo, err := storageProvider.NewRESTStorage(serverstorage.NewResourceConfig(), apiserverCfg.GenericConfig.RESTOptionsGetter)

View File

@ -18,9 +18,11 @@ package rest
import (
"crypto/tls"
goerrors "errors"
"fmt"
"net"
"net/http"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -33,6 +35,7 @@ import (
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
restclient "k8s.io/client-go/rest"
@ -60,16 +63,19 @@ import (
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
serviceallocator "k8s.io/kubernetes/pkg/registry/core/service/allocator/storage"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
serviceipallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
"k8s.io/kubernetes/pkg/registry/core/service/portallocator"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
servicestore "k8s.io/kubernetes/pkg/registry/core/service/storage"
serviceaccountstore "k8s.io/kubernetes/pkg/registry/core/serviceaccount/storage"
kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/async"
)
// GenericLegacyRESTStorageProvider provides information needed to build RESTStorage
// GenericConfig provides information needed to build RESTStorage
// for generic resources in core. It implements the "normal" RESTStorageProvider interface.
type GenericLegacyRESTStorageProvider struct {
type GenericConfig struct {
StorageFactory serverstorage.StorageFactory
EventTTL time.Duration
@ -83,21 +89,20 @@ type GenericLegacyRESTStorageProvider struct {
Informers informers.SharedInformerFactory
}
// LegacyRESTStorageProvider provides information needed to build RESTStorage for core, but
// does NOT implement the "normal" RESTStorageProvider (yet!)
type LegacyRESTStorageProvider struct {
GenericLegacyRESTStorageProvider
// Config provides information needed to build RESTStorage for core.
type Config struct {
GenericConfig
// Used for custom proxy dialing, and proxy TLS options
ProxyTransport http.RoundTripper
KubeletClientConfig kubeletclient.KubeletClientConfig
// ServiceIPRange is used to build cluster IPs for discovery.
ServiceIPRange net.IPNet
// allocates ips for secondary service cidr in dual stack clusters
SecondaryServiceIPRange net.IPNet
// Service IP ranges
ServiceIPRange net.IPNet
ServiceSecondaryIPRange net.IPNet
ServiceNodePortRange utilnet.PortRange
ServiceIPRepairInterval time.Duration
}
type rangeRegistries struct {
@ -106,7 +111,63 @@ type rangeRegistries struct {
nodePort rangeallocation.RangeRegistry
}
func (c GenericLegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
type legacyProvider struct {
Config
primaryServiceClusterIPAllocator ipallocator.Interface
serviceClusterIPAllocators map[api.IPFamily]ipallocator.Interface
serviceNodePortAllocator *portallocator.PortAllocator
startServiceNodePortsRepair, startServiceClusterIPRepair func(onFirstSuccess func(), stopCh chan struct{})
}
func New(c Config) (*legacyProvider, error) {
rangeRegistries, serviceClusterIPAllocator, serviceIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators()
if err != nil {
return nil, err
}
p := &legacyProvider{
Config: c,
primaryServiceClusterIPAllocator: serviceClusterIPAllocator,
serviceClusterIPAllocators: serviceIPAllocators,
serviceNodePortAllocator: serviceNodePortAllocator,
}
// create service node port repair controller
client, err := kubernetes.NewForConfig(c.LoopbackClientConfig)
if err != nil {
return nil, err
}
p.startServiceNodePortsRepair = portallocatorcontroller.NewRepair(c.ServiceIPRepairInterval, client.CoreV1(), client.EventsV1(), c.ServiceNodePortRange, rangeRegistries.nodePort).RunUntil
// create service cluster ip repair controller
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepair(
c.ServiceIPRepairInterval,
client.CoreV1(),
client.EventsV1(),
&c.ServiceIPRange,
rangeRegistries.clusterIP,
&c.ServiceSecondaryIPRange,
rangeRegistries.secondaryClusterIP,
).RunUntil
} else {
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress(
c.ServiceIPRepairInterval,
client,
&c.ServiceIPRange,
&c.ServiceSecondaryIPRange,
c.Informers.Core().V1().Services(),
c.Informers.Networking().V1alpha1().IPAddresses(),
).RunUntil
}
return p, nil
}
func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
@ -187,8 +248,12 @@ func (c GenericLegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource
return apiGroupInfo, nil
}
func (c LegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
apiGroupInfo, err := c.GenericLegacyRESTStorageProvider.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
func (c *GenericConfig) GroupName() string {
return api.GroupName
}
func (c *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error) {
apiGroupInfo, err := c.GenericConfig.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
@ -237,15 +302,11 @@ func (c LegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource server
return genericapiserver.APIGroupInfo{}, err
}
_, primaryServiceClusterIPAllocator, serviceClusterIPAllocators, serviceNodePortAllocator, err := c.newServiceIPAllocators()
if err != nil {
return genericapiserver.APIGroupInfo{}, err
}
serviceRESTStorage, serviceStatusStorage, serviceRESTProxy, err := servicestore.NewREST(
restOptionsGetter,
primaryServiceClusterIPAllocator.IPFamily(),
serviceClusterIPAllocators,
serviceNodePortAllocator,
c.primaryServiceClusterIPAllocator.IPFamily(),
c.serviceClusterIPAllocators,
c.serviceNodePortAllocator,
endpointsStorage,
podStorage.Pod,
c.ProxyTransport)
@ -357,7 +418,7 @@ func (c LegacyRESTStorageProvider) NewRESTStorage(apiResourceConfigSource server
return apiGroupInfo, nil
}
func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries rangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) {
func (c *Config) newServiceIPAllocators() (registries rangeRegistries, primaryClusterIPAllocator ipallocator.Interface, clusterIPAllocators map[api.IPFamily]ipallocator.Interface, nodePortAllocator *portallocator.PortAllocator, err error) {
clusterIPAllocators = map[api.IPFamily]ipallocator.Interface{}
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
@ -399,10 +460,10 @@ func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries rangeReg
clusterIPAllocators[primaryClusterIPAllocator.IPFamily()] = primaryClusterIPAllocator
var secondaryClusterIPAllocator ipallocator.Interface
if c.SecondaryServiceIPRange.IP != nil {
if c.ServiceSecondaryIPRange.IP != nil {
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
var err error
secondaryClusterIPAllocator, err = ipallocator.New(&c.SecondaryServiceIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
secondaryClusterIPAllocator, err = ipallocator.New(&c.ServiceSecondaryIPRange, func(max int, rangeSpec string, offset int) (allocator.Interface, error) {
var mem allocator.Snapshottable
mem = allocator.NewAllocationMapWithOffset(max, rangeSpec, offset)
// TODO etcdallocator package to return a storage interface via the storageFactory
@ -421,7 +482,7 @@ func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries rangeReg
if err != nil {
return rangeRegistries{}, nil, nil, nil, err
}
secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
secondaryClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.ServiceSecondaryIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
if err != nil {
return rangeRegistries{}, nil, nil, nil, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
}
@ -448,7 +509,50 @@ func (c LegacyRESTStorageProvider) newServiceIPAllocators() (registries rangeReg
return
}
func (p LegacyRESTStorageProvider) GroupName() string {
var _ genericapiserver.PostStartHookProvider = &legacyProvider{}
func (p *legacyProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return "start-service-ip-repair-controllers", func(context genericapiserver.PostStartHookContext) error {
// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
// We run both repair loops using RunUntil public interface.
// However, we want to fail liveness/readiness until the first
// successful repair loop, so we basically pass appropriate
// callbacks to RunUtil methods.
// Additionally, we ensure that we don't wait for it for longer
// than 1 minute for backward compatibility of failing the whole
// apiserver if we can't repair them.
wg := sync.WaitGroup{}
wg.Add(2)
runner := async.NewRunner(
func(stopCh chan struct{}) { p.startServiceClusterIPRepair(wg.Done, stopCh) },
func(stopCh chan struct{}) { p.startServiceNodePortsRepair(wg.Done, stopCh) },
)
runner.Start()
go func() {
defer runner.Stop()
<-context.StopCh
}()
// For backward compatibility, we ensure that if we never are able
// to repair clusterIPs and/or nodeports, we not only fail the liveness
// and/or readiness, but also explicitly fail.
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
case <-time.After(time.Minute):
return goerrors.New("unable to perform initial IP and Port allocation check")
}
return nil
}, nil
}
func (p *legacyProvider) GroupName() string {
return api.GroupName
}

View File

@ -44,6 +44,7 @@ var (
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/start-service-ip-repair-controllers ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
@ -64,6 +65,7 @@ var (
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/start-service-ip-repair-controllers ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
@ -85,6 +87,7 @@ var (
"[+]poststarthook/crd-informer-synced ok",
"[+]poststarthook/bootstrap-controller ok",
"[+]poststarthook/start-system-namespaces-controller ok",
"[+]poststarthook/start-service-ip-repair-controllers ok",
"[+]poststarthook/scheduling/bootstrap-system-priority-classes ok",
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",