Merge pull request #120202 from sttts/sttts-controlplane-config-split

Step 2 – generic controlplane: split server
This commit is contained in:
Kubernetes Prow Robot 2024-04-29 02:17:42 -07:00 committed by GitHub
commit 3a68b84d8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 427 additions and 320 deletions

View File

@ -27,7 +27,6 @@ import (
"os"
"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
"k8s.io/apimachinery/pkg/runtime"
@ -185,7 +184,7 @@ func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregat
}
// aggregator comes last in the chain
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.ControlPlane.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err

View File

@ -0,0 +1,91 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/klog/v2"
)
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error)
}
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (s *Server) InstallAPIs(restStorageProviders ...RESTStorageProvider) error {
nonLegacy := []*genericapiserver.APIGroupInfo{}
// used later in the loop to filter the served resource by those that have expired.
resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*s.GenericAPIServer.Version)
if err != nil {
return err
}
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
apiGroupInfo, err := restStorageBuilder.NewRESTStorage(s.APIResourceConfigSource, s.RESTOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q: %w", groupName, err)
}
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
// If we have no storage for any resource configured, this API group is effectively disabled.
// This can happen when an entire API group, version, or development-stage (alpha, beta, GA) is disabled.
klog.Infof("API group %q is not enabled, skipping.", groupName)
continue
}
// Remove resources that serving kinds that are removed.
// We do this here so that we don't accidentally serve versions without resources or openapi information that for kinds we don't serve.
// This is a spot above the construction of individual storage handlers so that no sig accidentally forgets to check.
resourceExpirationEvaluator.RemoveDeletedKinds(groupName, apiGroupInfo.Scheme, apiGroupInfo.VersionedResourcesStorageMap)
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
klog.V(1).Infof("Removing API group %v because it is time to stop serving it because it has no versions per APILifecycle.", groupName)
continue
}
klog.V(1).Infof("Enabling API group %q.", groupName)
if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
if err != nil {
return fmt.Errorf("error building PostStartHook: %w", err)
}
s.GenericAPIServer.AddPostStartHookOrDie(name, hook)
}
if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
if err := s.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}
if err := s.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %w", err)
}
return nil
}

View File

@ -18,6 +18,8 @@ package apiserver
import (
"fmt"
"net"
"strconv"
"time"
"k8s.io/apimachinery/pkg/runtime"
@ -87,3 +89,13 @@ func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}
// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress reconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {
if peerAdvertiseAddress.PeerAdvertiseIP != "" && peerAdvertiseAddress.PeerAdvertisePort != "" {
return net.JoinHostPort(peerAdvertiseAddress.PeerAdvertiseIP, peerAdvertiseAddress.PeerAdvertisePort)
} else {
return net.JoinHostPort(publicAddress.String(), strconv.Itoa(publicServicePort))
}
}

View File

@ -0,0 +1,285 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"os"
"time"
coordinationapiv1 "k8s.io/api/coordination/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
apiserverfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientgoinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/component-helpers/apimachinery/lease"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/serviceaccount"
)
var (
// IdentityLeaseGCPeriod is the interval which the lease GC controller checks for expired leases
// IdentityLeaseGCPeriod is exposed so integration tests can tune this value.
IdentityLeaseGCPeriod = 3600 * time.Second
// IdentityLeaseDurationSeconds is the duration of kube-apiserver lease in seconds
// IdentityLeaseDurationSeconds is exposed so integration tests can tune this value.
IdentityLeaseDurationSeconds = 3600
// IdentityLeaseRenewIntervalPeriod is the interval of kube-apiserver renewing its lease in seconds
// IdentityLeaseRenewIntervalPeriod is exposed so integration tests can tune this value.
IdentityLeaseRenewIntervalPeriod = 10 * time.Second
)
const (
// IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating:
// 1. the lease is an identity lease (different from leader election leases)
// 2. which component owns this lease
IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity"
)
// Server is a struct that contains a generic control plane apiserver instance
// that can be run to start serving the APIs.
type Server struct {
GenericAPIServer *genericapiserver.GenericAPIServer
APIResourceConfigSource serverstorage.APIResourceConfigSource
RESTOptionsGetter genericregistry.RESTOptionsGetter
ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
VersionedInformers clientgoinformers.SharedInformerFactory
}
// New returns a new instance of Master from the given config.
// Certain config fields will be set to a default value if unset.
// Certain config fields must be specified, including:
// KubeletClientConfig
func (c completedConfig) New(name string, delegationTarget genericapiserver.DelegationTarget) (*Server, error) {
generic, err := c.Generic.New(name, delegationTarget)
if err != nil {
return nil, err
}
if c.EnableLogsSupport {
routes.Logs{}.Install(generic.Handler.GoRestfulContainer)
}
// Metadata and keys are expected to only change across restarts at present,
// so we just marshal immediately and serve the cached JSON bytes.
md, err := serviceaccount.NewOpenIDMetadata(
c.ServiceAccountIssuerURL,
c.ServiceAccountJWKSURI,
c.Generic.ExternalAddress,
c.ServiceAccountPublicKeys,
)
if err != nil {
// If there was an error, skip installing the endpoints and log the
// error, but continue on. We don't return the error because the
// metadata responses require additional, backwards incompatible
// validation of command-line options.
msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
" ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
" enabled. Error: %v", err)
if c.ServiceAccountIssuerURL != "" {
// The user likely expects this feature to be enabled if issuer URL is
// set and the feature gate is enabled. In the future, if there is no
// longer a feature gate and issuer URL is not set, the user may not
// expect this feature to be enabled. We log the former case as an Error
// and the latter case as an Info.
klog.Error(msg)
} else {
klog.Info(msg)
}
} else {
routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
Install(generic.Handler.GoRestfulContainer)
}
s := &Server{
GenericAPIServer: generic,
APIResourceConfigSource: c.APIResourceConfigSource,
RESTOptionsGetter: c.Generic.RESTOptionsGetter,
ClusterAuthenticationInfo: c.ClusterAuthenticationInfo,
VersionedInformers: c.VersionedInformers,
}
client := kubernetes.NewForConfigOrDie(s.GenericAPIServer.LoopbackClientConfig)
if len(c.SystemNamespaces) > 0 {
s.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(c.SystemNamespaces, client, s.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
return nil
})
}
_, publicServicePort, err := c.Generic.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.Generic.APIServerID,
peeraddress,
c.Extra.PeerEndpointLeaseReconciler,
c.Extra.PeerEndpointReconcileInterval,
client)
if err != nil {
return nil, fmt.Errorf("failed to create peer endpoint lease controller: %w", err)
}
s.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller",
func(hookContext genericapiserver.PostStartHookContext) error {
peerEndpointCtrl.Start(hookContext.StopCh)
return nil
})
s.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller",
func() error {
peerEndpointCtrl.Stop()
return nil
})
if c.Extra.PeerProxy != nil {
s.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
}
s.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(s.ClusterAuthenticationInfo, client)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
// prime values and start listeners
if s.ClusterAuthenticationInfo.ClientCA != nil {
s.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := s.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
if s.ClusterAuthenticationInfo.RequestHeaderCA != nil {
s.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := s.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
go controller.Run(ctx, 1)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
leaseName := s.GenericAPIServer.APIServerID
holderIdentity := s.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.Generic.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
client,
holderIdentity,
int32(IdentityLeaseDurationSeconds),
nil,
IdentityLeaseRenewIntervalPeriod,
leaseName,
metav1.NamespaceSystem,
// TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver.
labelAPIServerHeartbeatFunc(name, peeraddress))
go controller.Run(ctx)
return nil
})
// TODO: move this into generic apiserver and make the lease identity value configurable
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
go apiserverleasegc.NewAPIServerLeaseGC(
client,
IdentityLeaseGCPeriod,
metav1.NamespaceSystem,
IdentityLeaseComponentLabelKey+"="+name,
).Run(hookContext.StopCh)
return nil
})
}
s.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go legacytokentracking.NewController(client).Run(hookContext.StopCh)
return nil
})
return s, nil
}
func labelAPIServerHeartbeatFunc(identity string, peeraddress string) lease.ProcessLeaseFunc {
return func(lease *coordinationapiv1.Lease) error {
if lease.Labels == nil {
lease.Labels = map[string]string{}
}
if lease.Annotations == nil {
lease.Annotations = map[string]string{}
}
// This label indiciates the identity of the lease object.
lease.Labels[IdentityLeaseComponentLabelKey] = identity
hostname, err := os.Hostname()
if err != nil {
return err
}
// convenience label to easily map a lease object to a specific apiserver
lease.Labels[apiv1.LabelHostname] = hostname
// Include apiserver network location <ip_port> used by peers to proxy requests between kube-apiservers
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if peeraddress != "" {
lease.Annotations[apiv1.AnnotationPeerAdvertiseAddress] = peeraddress
}
}
return nil
}
}

View File

@ -19,7 +19,6 @@ package controlplane
import (
"fmt"
"net"
"os"
"reflect"
"strconv"
"time"
@ -53,24 +52,15 @@ import (
storageapiv1alpha1 "k8s.io/api/storage/v1alpha1"
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
"k8s.io/component-helpers/apimachinery/lease"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
flowcontrolv1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1"
@ -79,19 +69,12 @@ import (
flowcontrolv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr"
"k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice"
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/features"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/utils/clock"
// RESTStorage installers
admissionregistrationrest "k8s.io/kubernetes/pkg/registry/admissionregistration/rest"
@ -126,27 +109,14 @@ const (
// IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating:
// 1. the lease is an identity lease (different from leader election leases)
// 2. which component owns this lease
IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity"
// TODO(sttts): remove this indirection
IdentityLeaseComponentLabelKey = controlplaneapiserver.IdentityLeaseComponentLabelKey
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
KubeAPIServer = "kube-apiserver"
// KubeAPIServerIdentityLeaseLabelSelector selects kube-apiserver identity leases
KubeAPIServerIdentityLeaseLabelSelector = IdentityLeaseComponentLabelKey + "=" + KubeAPIServer
// repairLoopInterval defines the interval used to run the Services ClusterIP and NodePort repair loops
repairLoopInterval = 3 * time.Minute
)
var (
// IdentityLeaseGCPeriod is the interval which the lease GC controller checks for expired leases
// IdentityLeaseGCPeriod is exposed so integration tests can tune this value.
IdentityLeaseGCPeriod = 3600 * time.Second
// IdentityLeaseDurationSeconds is the duration of kube-apiserver lease in seconds
// IdentityLeaseDurationSeconds is exposed so integration tests can tune this value.
IdentityLeaseDurationSeconds = 3600
// IdentityLeaseRenewIntervalPeriod is the interval of kube-apiserver renewing its lease in seconds
// IdentityLeaseRenewIntervalPeriod is exposed so integration tests can tune this value.
IdentityLeaseRenewIntervalPeriod = 10 * time.Second
)
// Extra defines extra configuration for kube-apiserver
type Extra struct {
EndpointReconcilerConfig EndpointReconcilerConfig
@ -222,9 +192,7 @@ type EndpointReconcilerConfig struct {
// Instance contains state for a Kubernetes cluster api server instance.
type Instance struct {
GenericAPIServer *genericapiserver.GenericAPIServer
ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
ControlPlane *controlplaneapiserver.Server
}
func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
@ -340,49 +308,13 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
}
s, err := c.ControlPlane.Generic.New("kube-apiserver", delegationTarget)
cp, err := c.ControlPlane.New(KubeAPIServer, delegationTarget)
if err != nil {
return nil, err
}
if c.ControlPlane.Extra.EnableLogsSupport {
routes.Logs{}.Install(s.Handler.GoRestfulContainer)
}
// Metadata and keys are expected to only change across restarts at present,
// so we just marshal immediately and serve the cached JSON bytes.
md, err := serviceaccount.NewOpenIDMetadata(
c.ControlPlane.Extra.ServiceAccountIssuerURL,
c.ControlPlane.Extra.ServiceAccountJWKSURI,
c.ControlPlane.Generic.ExternalAddress,
c.ControlPlane.Extra.ServiceAccountPublicKeys,
)
if err != nil {
// If there was an error, skip installing the endpoints and log the
// error, but continue on. We don't return the error because the
// metadata responses require additional, backwards incompatible
// validation of command-line options.
msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
" ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
" enabled. Error: %v", err)
if c.ControlPlane.Extra.ServiceAccountIssuerURL != "" {
// The user likely expects this feature to be enabled if issuer URL is
// set and the feature gate is enabled. In the future, if there is no
// longer a feature gate and issuer URL is not set, the user may not
// expect this feature to be enabled. We log the former case as an Error
// and the latter case as an Info.
klog.Error(msg)
} else {
klog.Info(msg)
}
} else {
routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
Install(s.Handler.GoRestfulContainer)
}
m := &Instance{
GenericAPIServer: s,
ClusterAuthenticationInfo: c.ControlPlane.Extra.ClusterAuthenticationInfo,
s := &Instance{
ControlPlane: cp,
}
client, err := kubernetes.NewForConfig(c.ControlPlane.Generic.LoopbackClientConfig)
@ -426,7 +358,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
// with specific priorities.
// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
// handlers that we have.
restStorageProviders := []RESTStorageProvider{
restStorageProviders := []controlplaneapiserver.RESTStorageProvider{
legacyRESTStorageProvider,
apiserverinternalrest.StorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.ControlPlane.Generic.Authentication.Authenticator, APIAudiences: c.ControlPlane.Generic.Authentication.APIAudiences},
@ -451,15 +383,10 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
eventsrest.RESTStorageProvider{TTL: c.ControlPlane.EventTTL},
resourcerest.RESTStorageProvider{},
}
if err := m.InstallAPIs(c.ControlPlane.Extra.APIResourceConfigSource, c.ControlPlane.Generic.RESTOptionsGetter, restStorageProviders...); err != nil {
if err := s.ControlPlane.InstallAPIs(restStorageProviders...); err != nil {
return nil, err
}
m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go systemnamespaces.NewController(c.ControlPlane.SystemNamespaces, client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
return nil
})
_, publicServicePort, err := c.ControlPlane.Generic.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
@ -475,17 +402,17 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.Extra.KubernetesServiceNodePort,
}, client, c.ControlPlane.Extra.VersionedInformers.Core().V1().Services())
s.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
s.ControlPlane.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
kubernetesServiceCtrl.Start(hookContext.StopCh)
return nil
})
s.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
s.ControlPlane.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
kubernetesServiceCtrl.Stop()
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kubernetes-service-cidr-controller", func(hookContext genericapiserver.PostStartHookContext) error {
s.ControlPlane.GenericAPIServer.AddPostStartHookOrDie("start-kubernetes-service-cidr-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := defaultservicecidr.NewController(
c.Extra.ServiceIPRange,
c.Extra.SecondaryServiceIPRange,
@ -498,208 +425,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
})
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.ControlPlane.Generic.APIServerID,
peeraddress,
c.ControlPlane.Extra.PeerEndpointLeaseReconciler,
c.Extra.EndpointReconcilerConfig.Interval,
client)
if err != nil {
return nil, fmt.Errorf("failed to create peer endpoint lease controller: %w", err)
}
m.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller",
func(hookContext genericapiserver.PostStartHookContext) error {
peerEndpointCtrl.Start(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller",
func() error {
peerEndpointCtrl.Stop()
return nil
})
// Add PostStartHooks for Unknown Version Proxy filter.
if c.ControlPlane.Extra.PeerProxy != nil {
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.ControlPlane.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
}
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, client)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
// prime values and start listeners
if m.ClusterAuthenticationInfo.ClientCA != nil {
m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(ctx, 1)
}
}
go controller.Run(ctx, 1)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx := wait.ContextForChannel(hookContext.StopCh)
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
client,
holderIdentity,
int32(IdentityLeaseDurationSeconds),
nil,
IdentityLeaseRenewIntervalPeriod,
leaseName,
metav1.NamespaceSystem,
// TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver.
labelAPIServerHeartbeatFunc(KubeAPIServer, peeraddress))
go controller.Run(ctx)
return nil
})
// TODO: move this into generic apiserver and make the lease identity value configurable
m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
go apiserverleasegc.NewAPIServerLeaseGC(
client,
IdentityLeaseGCPeriod,
metav1.NamespaceSystem,
KubeAPIServerIdentityLeaseLabelSelector,
).Run(hookContext.StopCh)
return nil
})
}
m.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
go legacytokentracking.NewController(client).Run(hookContext.StopCh)
return nil
})
return m, nil
}
func labelAPIServerHeartbeatFunc(identity string, peeraddress string) lease.ProcessLeaseFunc {
return func(lease *coordinationapiv1.Lease) error {
if lease.Labels == nil {
lease.Labels = map[string]string{}
}
if lease.Annotations == nil {
lease.Annotations = map[string]string{}
}
// This label indiciates the identity of the lease object.
lease.Labels[IdentityLeaseComponentLabelKey] = identity
hostname, err := os.Hostname()
if err != nil {
return err
}
// convenience label to easily map a lease object to a specific apiserver
lease.Labels[apiv1.LabelHostname] = hostname
// Include apiserver network location <ip_port> used by peers to proxy requests between kube-apiservers
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if peeraddress != "" {
lease.Annotations[apiv1.AnnotationPeerAdvertiseAddress] = peeraddress
}
}
return nil
}
}
// RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error)
}
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
nonLegacy := []*genericapiserver.APIGroupInfo{}
// used later in the loop to filter the served resource by those that have expired.
resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*m.GenericAPIServer.Version)
if err != nil {
return err
}
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if err != nil {
return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
}
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
// If we have no storage for any resource configured, this API group is effectively disabled.
// This can happen when an entire API group, version, or development-stage (alpha, beta, GA) is disabled.
klog.Infof("API group %q is not enabled, skipping.", groupName)
continue
}
// Remove resources that serving kinds that are removed.
// We do this here so that we don't accidentally serve versions without resources or openapi information that for kinds we don't serve.
// This is a spot above the construction of individual storage handlers so that no sig accidentally forgets to check.
resourceExpirationEvaluator.RemoveDeletedKinds(groupName, apiGroupInfo.Scheme, apiGroupInfo.VersionedResourcesStorageMap)
if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
klog.V(1).Infof("Removing API group %v because it is time to stop serving it because it has no versions per APILifecycle.", groupName)
continue
}
klog.V(1).Infof("Enabling API group %q.", groupName)
if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
if err != nil {
klog.Fatalf("Error building PostStartHook: %v", err)
}
m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
}
if len(groupName) == 0 {
// the legacy group for core APIs is special that it is installed into /api via this special install method.
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering legacy API: %w", err)
}
} else {
// everything else goes to /apis
nonLegacy = append(nonLegacy, &apiGroupInfo)
}
}
if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
return fmt.Errorf("error in registering group versions: %v", err)
}
return nil
return s, nil
}
var (
@ -772,13 +498,3 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
return ret
}
// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {
if peerAdvertiseAddress.PeerAdvertiseIP != "" && peerAdvertiseAddress.PeerAdvertisePort != "" {
return net.JoinHostPort(peerAdvertiseAddress.PeerAdvertiseIP, peerAdvertiseAddress.PeerAdvertisePort)
} else {
return net.JoinHostPort(publicAddress.String(), strconv.Itoa(publicServicePort))
}
}

View File

@ -224,7 +224,7 @@ func TestVersion(t *testing.T) {
req, _ := http.NewRequest("GET", "/version", nil)
resp := httptest.NewRecorder()
s.GenericAPIServer.Handler.ServeHTTP(resp, req)
s.ControlPlane.GenericAPIServer.Handler.ServeHTTP(resp, req)
if resp.Code != 200 {
t.Fatalf("expected http 200, got: %d", resp.Code)
}
@ -259,7 +259,7 @@ func TestAPIVersionOfDiscoveryEndpoints(t *testing.T) {
apiserver, etcdserver, _, assert := newInstance(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(apiserver.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
server := httptest.NewServer(apiserver.ControlPlane.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
// /api exists in release-1.1
resp, err := http.Get(server.URL + "/api")
@ -316,7 +316,7 @@ func TestStorageVersionHashes(t *testing.T) {
apiserver, etcdserver, _, _ := newInstance(t)
defer etcdserver.Terminate(t)
server := httptest.NewServer(apiserver.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
server := httptest.NewServer(apiserver.ControlPlane.GenericAPIServer.Handler.GoRestfulContainer.ServeMux)
c := &restclient.Config{
Host: server.URL,

View File

@ -39,7 +39,7 @@ import (
"k8s.io/klog/v2"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversiongc"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
@ -131,9 +131,9 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
// set lease duration to 1s for serverA to ensure that storageversions for serverA are updated
// once it is shutdown
controlplane.IdentityLeaseDurationSeconds = 10
controlplane.IdentityLeaseGCPeriod = time.Second
controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second
controlplaneapiserver.IdentityLeaseDurationSeconds = 10
controlplaneapiserver.IdentityLeaseGCPeriod = time.Second
controlplaneapiserver.IdentityLeaseRenewIntervalPeriod = 10 * time.Second
// start serverA with all APIs enabled
// override hostname to ensure unique ips
@ -146,7 +146,7 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
setupStorageVersionGC(ctx, kubeClientSetA, informersA)
// reset lease duration to default value for serverB and serverC since we will not be
// shutting these down
controlplane.IdentityLeaseDurationSeconds = 3600
controlplaneapiserver.IdentityLeaseDurationSeconds = 3600
// start serverB with some api disabled
// override hostname to ensure unique ips

View File

@ -39,6 +39,7 @@ import (
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
)
@ -84,7 +85,7 @@ func TestCreateLeaseOnStart(t *testing.T) {
leases, err := kubeclient.
CoordinationV1().
Leases(metav1.NamespaceSystem).
List(context.TODO(), metav1.ListOptions{LabelSelector: controlplane.KubeAPIServerIdentityLeaseLabelSelector})
List(context.TODO(), metav1.ListOptions{LabelSelector: controlplaneapiserver.IdentityLeaseComponentLabelKey + "=" + controlplane.KubeAPIServer})
if err != nil {
return false, err
}
@ -113,20 +114,20 @@ func TestCreateLeaseOnStart(t *testing.T) {
}
func TestLeaseGarbageCollection(t *testing.T) {
oldIdentityLeaseDurationSeconds := controlplane.IdentityLeaseDurationSeconds
oldIdentityLeaseGCPeriod := controlplane.IdentityLeaseGCPeriod
oldIdentityLeaseRenewIntervalPeriod := controlplane.IdentityLeaseRenewIntervalPeriod
oldIdentityLeaseDurationSeconds := controlplaneapiserver.IdentityLeaseDurationSeconds
oldIdentityLeaseGCPeriod := controlplaneapiserver.IdentityLeaseGCPeriod
oldIdentityLeaseRenewIntervalPeriod := controlplaneapiserver.IdentityLeaseRenewIntervalPeriod
defer func() {
// reset the default values for leases after this test
controlplane.IdentityLeaseDurationSeconds = oldIdentityLeaseDurationSeconds
controlplane.IdentityLeaseGCPeriod = oldIdentityLeaseGCPeriod
controlplane.IdentityLeaseRenewIntervalPeriod = oldIdentityLeaseRenewIntervalPeriod
controlplaneapiserver.IdentityLeaseDurationSeconds = oldIdentityLeaseDurationSeconds
controlplaneapiserver.IdentityLeaseGCPeriod = oldIdentityLeaseGCPeriod
controlplaneapiserver.IdentityLeaseRenewIntervalPeriod = oldIdentityLeaseRenewIntervalPeriod
}()
// Shorten lease parameters so GC behavior can be exercised in integration tests
controlplane.IdentityLeaseDurationSeconds = 1
controlplane.IdentityLeaseGCPeriod = time.Second
controlplane.IdentityLeaseRenewIntervalPeriod = time.Second
controlplaneapiserver.IdentityLeaseDurationSeconds = 1
controlplaneapiserver.IdentityLeaseGCPeriod = time.Second
controlplaneapiserver.IdentityLeaseRenewIntervalPeriod = time.Second
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)
result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
@ -206,7 +207,7 @@ func newTestLease(acquireTime time.Time, namespace string) *coordinationv1.Lease
Name: testLeaseName,
Namespace: namespace,
Labels: map[string]string{
controlplane.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer,
controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer,
},
},
Spec: coordinationv1.LeaseSpec{

View File

@ -176,7 +176,7 @@ func StartTestServer(ctx context.Context, t testing.TB, setup TestServerSetup) (
errCh = make(chan error)
go func() {
defer close(errCh)
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(ctx.Done()); err != nil {
if err := kubeAPIServer.ControlPlane.GenericAPIServer.PrepareRun().Run(ctx.Done()); err != nil {
errCh <- err
}
}()

View File

@ -38,6 +38,8 @@ func TestApiserverExportsSymbols(t *testing.T) {
},
}
_ = &controlplane.Instance{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
ControlPlane: &controlplaneapiserver.Server{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
},
}
}

View File

@ -37,6 +37,7 @@ import (
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversiongc"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
)
@ -174,7 +175,7 @@ func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface,
Name: name,
Namespace: metav1.NamespaceSystem,
Labels: map[string]string{
controlplane.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer,
controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer,
},
},
Spec: coordinationv1.LeaseSpec{