Merge pull request #124576 from sttts/sttts-peer-proxy-generic-move

controlplane/apiserver: move peer proxy code to allow generic aggregator construction
This commit is contained in:
Kubernetes Prow Robot 2024-04-29 01:11:06 -07:00 committed by GitHub
commit c6b6163e2e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 122 additions and 71 deletions

View File

@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ApiExtensions = apiExtensions
aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.Extra.PeerProxy, pluginInitializer)
aggregator, err := createAggregatorConfig(*kubeAPIs.ControlPlane.Generic, opts.CompletedOptions, kubeAPIs.ControlPlane.VersionedInformers, serviceResolver, kubeAPIs.ControlPlane.ProxyTransport, kubeAPIs.ControlPlane.Extra.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}

View File

@ -266,14 +266,14 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
config.Extra.PeerEndpointLeaseReconciler, err = controlplane.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
config.ControlPlane.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
if err != nil {
return nil, nil, nil, err
}
// build peer proxy config only if peer ca file exists
if opts.PeerCAFile != "" {
config.Extra.PeerProxy, err = controlplane.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
config.ControlPlane.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ControlPlane.Extra.PeerEndpointLeaseReconciler, config.ControlPlane.Generic.Serializer)
if err != nil {
return nil, nil, nil, err
}

View File

@ -40,5 +40,9 @@ func (c *Config) Complete() CompletedConfig {
discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.Generic.ExternalAddress}
cfg.Generic.DiscoveryAddresses = discoveryAddresses
if cfg.Extra.PeerEndpointReconcileInterval == 0 {
cfg.Extra.PeerEndpointReconcileInterval = DefaultPeerEndpointReconcileInterval
}
return CompletedConfig{&cfg}
}

View File

@ -31,12 +31,14 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/openapi"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/component-base/version"
@ -67,6 +69,18 @@ type Extra struct {
EnableLogsSupport bool
ProxyTransport *http.Transport
// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointReconcileInterval defines how often the endpoint leases are reconciled in etcd.
PeerEndpointReconcileInterval time.Duration
// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
ServiceAccountIssuer serviceaccount.TokenGenerator
ServiceAccountMaxExpiration time.Duration
ExtendExpiration bool

View File

@ -0,0 +1,89 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
)
const (
// DefaultPeerEndpointReconcileInterval is the default amount of time for how often
// the peer endpoint leases are reconciled.
DefaultPeerEndpointReconcileInterval = 10 * time.Second
// DefaultPeerEndpointReconcilerTTL is the default TTL timeout for peer endpoint
// leases on the storage layer
DefaultPeerEndpointReconcilerTTL = 15 * time.Second
)
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}
// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
ttl := DefaultPeerEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}

View File

@ -54,7 +54,6 @@ import (
storageapiv1beta1 "k8s.io/api/storage/v1beta1"
svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime"
@ -67,14 +66,10 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
"k8s.io/client-go/transport"
"k8s.io/component-helpers/apimachinery/lease"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
@ -157,16 +152,6 @@ type Extra struct {
EndpointReconcilerConfig EndpointReconcilerConfig
KubeletClientConfig kubeletclient.KubeletClientConfig
// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
// Values to build the IP addresses used by discovery
// The range of IPs to be assigned to services with type=ClusterIP or greater
ServiceIPRange net.IPNet
@ -290,6 +275,12 @@ func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
func (c *Config) Complete() CompletedConfig {
if c.ControlPlane.PeerEndpointReconcileInterval == 0 && c.EndpointReconcilerConfig.Interval != 0 {
// default this to the endpoint reconciler value before the generic
// controlplane completion can kick in
c.ControlPlane.PeerEndpointReconcileInterval = c.EndpointReconcilerConfig.Interval
}
cfg := completedConfig{
c.ControlPlane.Complete(),
&c.Extra,
@ -508,11 +499,11 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.ControlPlane.Generic.APIServerID,
peeraddress,
c.Extra.PeerEndpointLeaseReconciler,
c.ControlPlane.Extra.PeerEndpointLeaseReconciler,
c.Extra.EndpointReconcilerConfig.Interval,
client)
if err != nil {
@ -529,9 +520,9 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
// Add PostStartHooks for Unknown Version Proxy filter.
if c.Extra.PeerProxy != nil {
if c.ControlPlane.Extra.PeerProxy != nil {
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
err := c.ControlPlane.Extra.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
@ -579,7 +570,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
peeraddress := getPeerAddress(c.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
peeraddress := getPeerAddress(c.ControlPlane.Extra.PeerAdvertiseAddress, c.ControlPlane.Generic.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
@ -782,53 +773,6 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
return ret
}
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (peerreconcilers.PeerEndpointLeaseReconciler, error) {
ttl := DefaultEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := peerreconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler peerreconcilers.PeerEndpointLeaseReconciler, serializer kruntime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}
// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}
// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {