diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 7aa12d12b87..48264016b71 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -37,6 +37,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" kubeexternalinformers "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -57,6 +58,7 @@ func createAggregatorConfig( externalInformers kubeexternalinformers.SharedInformerFactory, serviceResolver aggregatorapiserver.ServiceResolver, proxyTransport *http.Transport, + peerProxy utilpeerproxy.Interface, pluginInitializers []admission.PluginInitializer, ) (*aggregatorapiserver.Config, error) { // make a shallow copy to let us twiddle a few things @@ -76,6 +78,16 @@ func createAggregatorConfig( genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition } + if peerProxy != nil { + originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc + genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler { + // Add peer proxy handler to aggregator-apiserver. + // wrap the peer proxy handler first. + apiHandler = peerProxy.WrapHandler(apiHandler) + return originalHandlerChainBuilder(apiHandler, c) + } + } + // copy the etcd options so we don't mutate originals. // we assume that the etcd options have been completed already. avoid messing with anything outside // of changes to StorageConfig as that may lead to unexpected behavior when the options are applied. @@ -104,6 +116,8 @@ func createAggregatorConfig( ExtraConfig: aggregatorapiserver.ExtraConfig{ ProxyClientCertFile: commandOptions.ProxyClientCertFile, ProxyClientKeyFile: commandOptions.ProxyClientKeyFile, + PeerCAFile: commandOptions.PeerCAFile, + PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress, ServiceResolver: serviceResolver, ProxyTransport: proxyTransport, RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects, diff --git a/cmd/kube-apiserver/app/config.go b/cmd/kube-apiserver/app/config.go index ef4671ab3fe..4ffe3f3798a 100644 --- a/cmd/kube-apiserver/app/config.go +++ b/cmd/kube-apiserver/app/config.go @@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } c.ApiExtensions = apiExtensions - aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, pluginInitializer) + aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer) if err != nil { return nil, err } diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index dad9383ed48..807c169631f 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -57,6 +57,7 @@ import ( "k8s.io/klog/v2" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -258,6 +259,21 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) ( }, } + if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { + config.ExtraConfig.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory) + if err != nil { + return nil, nil, nil, err + } + // build peer proxy config only if peer ca file exists + if opts.PeerCAFile != "" { + config.ExtraConfig.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile, + opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ExtraConfig.PeerEndpointLeaseReconciler, config.GenericConfig.Serializer) + if err != nil { + return nil, nil, nil, err + } + } + } + clientCAProvider, err := opts.Authentication.ClientCert.GetClientCAContentProvider() if err != nil { return nil, nil, nil, err diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index f6778d15f95..4c032718c95 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -18,6 +18,7 @@ package testing import ( "context" + "crypto/rsa" "crypto/x509" "fmt" "net" @@ -38,12 +39,15 @@ import ( serveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storageversion" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + clientgotransport "k8s.io/client-go/transport" "k8s.io/client-go/util/cert" logsapi "k8s.io/component-base/logs/api/v1" "k8s.io/klog/v2" "k8s.io/kube-aggregator/pkg/apiserver" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" @@ -77,6 +81,14 @@ type TestServerInstanceOptions struct { EnableCertAuth bool // Wrap the storage version interface of the created server's generic server. StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager + // CA file used for requestheader authn during communication between: + // 1. kube-apiserver and peer when the local apiserver is not able to serve the request due + // to version skew + // 2. kube-apiserver and aggregated apiserver + + // We specify this as on option to pass a common proxyCA to multiple apiservers to simulate + // an apiserver version skew scenario where all apiservers use the same proxyCA to verify client connections. + ProxyCA *ProxyCA } // TestServer return values supplied by kube-test-ApiServer @@ -95,6 +107,16 @@ type Logger interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Logf(format string, args ...interface{}) + Cleanup(func()) +} + +// ProxyCA contains the certificate authority certificate and key which is used to verify client connections +// to kube-apiservers. The clients can be : +// 1. aggregated apiservers +// 2. peer kube-apiservers +type ProxyCA struct { + ProxySigningCert *x509.Certificate + ProxySigningKey *rsa.PrivateKey } // NewDefaultTestServerOptions Default options for TestServer instances @@ -161,14 +183,24 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo reqHeaders := serveroptions.NewDelegatingAuthenticationOptions() s.Authentication.RequestHeader = &reqHeaders.RequestHeader - // create certificates for aggregation and client-cert auth - proxySigningKey, err := testutil.NewPrivateKey() - if err != nil { - return result, err - } - proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) - if err != nil { - return result, err + var proxySigningKey *rsa.PrivateKey + var proxySigningCert *x509.Certificate + + if instanceOptions.ProxyCA != nil { + // use provided proxyCA + proxySigningKey = instanceOptions.ProxyCA.ProxySigningKey + proxySigningCert = instanceOptions.ProxyCA.ProxySigningCert + + } else { + // create certificates for aggregation and client-cert auth + proxySigningKey, err = testutil.NewPrivateKey() + if err != nil { + return result, err + } + proxySigningCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) + if err != nil { + return result, err + } } proxyCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt") if err := os.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil { @@ -213,6 +245,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo return result, err } s.Authentication.ClientCert.ClientCA = clientCACertFile + if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { + // TODO: set up a general clean up for testserver + if clientgotransport.DialerStopCh == wait.NeverStop { + ctx, cancel := context.WithTimeout(context.Background(), time.Hour) + t.Cleanup(cancel) + clientgotransport.DialerStopCh = ctx.Done() + } + s.PeerCAFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, s.SecureServing.ServerCert.PairName+".crt") + } } s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index af4b82b2657..824469107ea 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -28,19 +28,25 @@ import ( "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericfeatures "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/reconcilers" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" "k8s.io/apiserver/pkg/server/filters" serverstorage "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/apiserver/pkg/util/openapi" + utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" clientgoinformers "k8s.io/client-go/informers" clientgoclientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/transport" "k8s.io/component-base/version" + "k8s.io/klog/v2" openapicommon "k8s.io/kube-openapi/pkg/common" "k8s.io/kubernetes/pkg/api/legacyscheme" + api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controlplane" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options" "k8s.io/kubernetes/pkg/kubeapiserver" @@ -193,3 +199,50 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien s.GenericServerRunOptions.RequestTimeout/4, ), nil } + +// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop +// The peer endpoint leases are used to find network locations of apiservers for peer proxy +func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) { + ttl := controlplane.DefaultEndpointReconcilerTTL + config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo")) + if err != nil { + return nil, fmt.Errorf("error creating storage factory config: %w", err) + } + reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl) + return reconciler, err +} + +func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager, + proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress, + apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) { + if proxyClientCertFile == "" { + return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified") + } + if proxyClientKeyFile == "" { + return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified") + } + // create proxy client config + clientConfig := &transport.Config{ + TLS: transport.TLSConfig{ + Insecure: false, + CertFile: proxyClientCertFile, + KeyFile: proxyClientKeyFile, + CAFile: peerCAFile, + ServerName: "kubernetes.default.svc", + }} + + // build proxy transport + proxyRoundTripper, transportBuildingError := transport.New(clientConfig) + if transportBuildingError != nil { + klog.Error(transportBuildingError.Error()) + return nil, transportBuildingError + } + return utilpeerproxy.NewPeerProxyHandler( + versionedInformer, + svm, + proxyRoundTripper, + apiServerID, + reconciler, + serializer, + ), nil +} diff --git a/pkg/controlplane/apiserver/options/options.go b/pkg/controlplane/apiserver/options/options.go index 51836eaac65..d3e19ddce9c 100644 --- a/pkg/controlplane/apiserver/options/options.go +++ b/pkg/controlplane/apiserver/options/options.go @@ -24,6 +24,7 @@ import ( "strings" "time" + peerreconcilers "k8s.io/apiserver/pkg/reconcilers" genericoptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/client-go/util/keyutil" @@ -63,6 +64,16 @@ type Options struct { ProxyClientCertFile string ProxyClientKeyFile string + // PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers' + // serving certs when routing a request to the peer in the case the request can not be served + // locally due to version skew. + PeerCAFile string + + // PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request + // to this apiserver. This happens in cases where the peer is not able to serve the request due to + // version skew. + PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress + EnableAggregatorRouting bool AggregatorRejectForwardingRedirects bool @@ -154,6 +165,20 @@ func (s *Options) AddFlags(fss *cliflag.NamedFlagSets) { "when it must call out during a request. This includes proxying requests to a user "+ "api-server and calling out to webhook admission plugins.") + fs.StringVar(&s.PeerCAFile, "peer-ca-file", s.PeerCAFile, + "If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this file will be used to verify serving certificates of peer kube-apiservers. "+ + "This flag is only used in clusters configured with multiple kube-apiservers for high availability.") + + fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertiseIP, "peer-advertise-ip", s.PeerAdvertiseAddress.PeerAdvertiseIP, + "If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this IP will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+ + "when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+ + "This flag is only used in clusters configured with multiple kube-apiservers for high availability. ") + + fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertisePort, "peer-advertise-port", s.PeerAdvertiseAddress.PeerAdvertisePort, + "If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this port will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+ + "when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+ + "This flag is only used in clusters configured with multiple kube-apiservers for high availability. ") + fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting, "Turns on aggregator routing requests to endpoints IP rather than cluster IP.") diff --git a/pkg/controlplane/apiserver/options/validation.go b/pkg/controlplane/apiserver/options/validation.go index 017938f92ae..879833ed2a7 100644 --- a/pkg/controlplane/apiserver/options/validation.go +++ b/pkg/controlplane/apiserver/options/validation.go @@ -25,6 +25,7 @@ import ( genericfeatures "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/api/legacyscheme" ) @@ -69,6 +70,32 @@ func validateAPIPriorityAndFairness(options *Options) []error { return nil } +func validateUnknownVersionInteroperabilityProxyFeature() []error { + if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + return nil + } + return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")} + } + return nil +} + +func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error { + err := []error{} + if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { + if options.PeerCAFile != "" { + err = append(err, fmt.Errorf("--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on")) + } + if options.PeerAdvertiseAddress.PeerAdvertiseIP != "" { + err = append(err, fmt.Errorf("--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on")) + } + if options.PeerAdvertiseAddress.PeerAdvertisePort != "" { + err = append(err, fmt.Errorf("--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on")) + } + } + return err +} + // Validate checks Options and return a slice of found errs. func (s *Options) Validate() []error { var errs []error @@ -83,6 +110,8 @@ func (s *Options) Validate() []error { errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...) errs = append(errs, validateTokenRequest(s)...) errs = append(errs, s.Metrics.Validate()...) + errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...) + errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...) return errs } diff --git a/pkg/controlplane/apiserver/options/validation_test.go b/pkg/controlplane/apiserver/options/validation_test.go index 10def81242e..b9327d0d429 100644 --- a/pkg/controlplane/apiserver/options/validation_test.go +++ b/pkg/controlplane/apiserver/options/validation_test.go @@ -22,8 +22,13 @@ import ( kubeapiserveradmission "k8s.io/apiserver/pkg/admission" genericoptions "k8s.io/apiserver/pkg/server/options" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/component-base/featuregate" basemetrics "k8s.io/component-base/metrics" + "k8s.io/kubernetes/pkg/features" + peerreconcilers "k8s.io/apiserver/pkg/reconcilers" + featuregatetesting "k8s.io/component-base/featuregate/testing" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" ) @@ -80,6 +85,83 @@ func TestValidateAPIPriorityAndFairness(t *testing.T) { } } +func TestValidateUnknownVersionInteroperabilityProxy(t *testing.T) { + tests := []struct { + name string + featureEnabled bool + errShouldContain string + peerCAFile string + peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress + }{ + { + name: "feature disabled but peerCAFile set", + featureEnabled: false, + peerCAFile: "foo", + errShouldContain: "--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on", + }, + { + name: "feature disabled but peerAdvertiseIP set", + featureEnabled: false, + peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertiseIP: "1.2.3.4"}, + errShouldContain: "--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on", + }, + { + name: "feature disabled but peerAdvertisePort set", + featureEnabled: false, + peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertisePort: "1"}, + errShouldContain: "--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + options := &Options{ + PeerCAFile: test.peerCAFile, + PeerAdvertiseAddress: test.peerAdvertiseAddress, + } + if test.featureEnabled { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.UnknownVersionInteroperabilityProxy, true)() + } + var errMessageGot string + if errs := validateUnknownVersionInteroperabilityProxyFlags(options); len(errs) > 0 { + errMessageGot = errs[0].Error() + } + if !strings.Contains(errMessageGot, test.errShouldContain) { + t.Errorf("Expected error message to contain: %q, but got: %q", test.errShouldContain, errMessageGot) + } + + }) + } +} + +func TestValidateUnknownVersionInteroperabilityProxyFeature(t *testing.T) { + const conflict = "UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled" + tests := []struct { + name string + featuresEnabled []featuregate.Feature + }{ + { + name: "enabled: UnknownVersionInteroperabilityProxy, disabled: StorageVersionAPI", + featuresEnabled: []featuregate.Feature{features.UnknownVersionInteroperabilityProxy}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + for _, feature := range test.featuresEnabled { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, true)() + } + var errMessageGot string + if errs := validateUnknownVersionInteroperabilityProxyFeature(); len(errs) > 0 { + errMessageGot = errs[0].Error() + } + if !strings.Contains(errMessageGot, conflict) { + t.Errorf("Expected error message to contain: %q, but got: %q", conflict, errMessageGot) + } + }) + } +} + func TestValidateOptions(t *testing.T) { testCases := []struct { name string diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index c40f3849266..8667d66bd15 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -61,11 +61,13 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/discovery" apiserverfeatures "k8s.io/apiserver/pkg/features" + peerreconcilers "k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/registry/generic" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" + utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" @@ -83,6 +85,7 @@ import ( "k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking" "k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces" "k8s.io/kubernetes/pkg/controlplane/reconcilers" + "k8s.io/kubernetes/pkg/features" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/routes" @@ -157,6 +160,23 @@ type ExtraConfig struct { EnableLogsSupport bool ProxyTransport *http.Transport + // PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests + // that can not be served locally + PeerProxy utilpeerproxy.Interface + + // PeerEndpointLeaseReconciler updates the peer endpoint leases + PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler + + // PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers' + // serving certs when routing a request to the peer in the case the request can not be served + // locally due to version skew. + PeerCAFile string + + // PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request + // to this apiserver. This happens in cases where the peer is not able to serve the request due to + // version skew. If unset, AdvertiseAddress/BindAddress will be used. + PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress + // Values to build the IP addresses used by discovery // The range of IPs to be assigned to services with type=ClusterIP or greater ServiceIPRange net.IPNet @@ -492,6 +512,36 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil }) + if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { + peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort) + peerEndpointCtrl := peerreconcilers.New( + c.GenericConfig.APIServerID, + peeraddress, + c.ExtraConfig.PeerEndpointLeaseReconciler, + c.ExtraConfig.EndpointReconcilerConfig.Interval, + clientset) + if err != nil { + return nil, fmt.Errorf("failed to create peer endpoint lease controller: %w", err) + } + m.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller", + func(hookContext genericapiserver.PostStartHookContext) error { + peerEndpointCtrl.Start(hookContext.StopCh) + return nil + }) + m.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller", + func() error { + peerEndpointCtrl.Stop() + return nil + }) + // Add PostStartHooks for Unknown Version Proxy filter. + if c.ExtraConfig.PeerProxy != nil { + m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error { + err := c.ExtraConfig.PeerProxy.WaitForCacheSync(context.StopCh) + return err + }) + } + } + m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error { controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, clientset) @@ -539,6 +589,8 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) leaseName := m.GenericAPIServer.APIServerID holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID()) + peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort) + // must replace ':,[]' in [ip:port] to be able to store this as a valid label value controller := lease.NewController( clock.RealClock{}, kubeClient, @@ -549,7 +601,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) leaseName, metav1.NamespaceSystem, // TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver. - labelAPIServerHeartbeatFunc(KubeAPIServer)) + labelAPIServerHeartbeatFunc(KubeAPIServer, peeraddress)) go controller.Run(ctx) return nil }) @@ -597,12 +649,16 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) return m, nil } -func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc { +func labelAPIServerHeartbeatFunc(identity string, peeraddress string) lease.ProcessLeaseFunc { return func(lease *coordinationapiv1.Lease) error { if lease.Labels == nil { lease.Labels = map[string]string{} } + if lease.Annotations == nil { + lease.Annotations = map[string]string{} + } + // This label indiciates the identity of the lease object. lease.Labels[IdentityLeaseComponentLabelKey] = identity @@ -613,6 +669,13 @@ func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc { // convenience label to easily map a lease object to a specific apiserver lease.Labels[apiv1.LabelHostname] = hostname + + // Include apiserver network location used by peers to proxy requests between kube-apiservers + if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { + if peeraddress != "" { + lease.Annotations[apiv1.AnnotationPeerAdvertiseAddress] = peeraddress + } + } return nil } } @@ -752,3 +815,13 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig { return ret } + +// utility function to get the apiserver address that is used by peer apiservers to proxy +// requests to this apiserver in case the peer is incapable of serving the request +func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string { + if peerAdvertiseAddress.PeerAdvertiseIP != "" && peerAdvertiseAddress.PeerAdvertisePort != "" { + return net.JoinHostPort(peerAdvertiseAddress.PeerAdvertiseIP, peerAdvertiseAddress.PeerAdvertisePort) + } else { + return net.JoinHostPort(publicAddress.String(), strconv.Itoa(publicServicePort)) + } +} diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 5973a3a2c45..ca92e95a5e8 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -867,6 +867,12 @@ const ( // Allow the usage of options to fine-tune the topology manager policies. TopologyManagerPolicyOptions featuregate.Feature = "TopologyManagerPolicyOptions" + // owner: @richabanker + // alpha: v1.28 + // + // Proxies client to an apiserver capable of serving the request in the event of version skew. + UnknownVersionInteroperabilityProxy featuregate.Feature = "UnknownVersionInteroperabilityProxy" + // owner: @rata, @giuseppe // kep: https://kep.k8s.io/127 // alpha: v1.25 @@ -1157,6 +1163,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS TopologyManagerPolicyOptions: {Default: true, PreRelease: featuregate.Beta}, + UnknownVersionInteroperabilityProxy: {Default: false, PreRelease: featuregate.Alpha}, + VolumeCapacityPriority: {Default: false, PreRelease: featuregate.Alpha}, UserNamespacesSupport: {Default: false, PreRelease: featuregate.Alpha}, diff --git a/staging/src/k8s.io/api/core/v1/well_known_labels.go b/staging/src/k8s.io/api/core/v1/well_known_labels.go index 5cf82a98175..8c3cb87b82a 100644 --- a/staging/src/k8s.io/api/core/v1/well_known_labels.go +++ b/staging/src/k8s.io/api/core/v1/well_known_labels.go @@ -19,6 +19,10 @@ package v1 const ( LabelHostname = "kubernetes.io/hostname" + // Label value is the network location of kube-apiserver stored as + // Stored in APIServer Identity lease objects to view what address is used for peer proxy + AnnotationPeerAdvertiseAddress = "kubernetes.io/peer-advertise-address" + LabelTopologyZone = "topology.kubernetes.io/zone" LabelTopologyRegion = "topology.kubernetes.io/region" diff --git a/staging/src/k8s.io/apiserver/go.mod b/staging/src/k8s.io/apiserver/go.mod index ff4ab39e1bf..e1ca4fde140 100644 --- a/staging/src/k8s.io/apiserver/go.mod +++ b/staging/src/k8s.io/apiserver/go.mod @@ -89,6 +89,7 @@ require ( github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/pquerna/cachecontrol v0.1.0 // indirect diff --git a/staging/src/k8s.io/apiserver/go.sum b/staging/src/k8s.io/apiserver/go.sum index 59ac0758f19..732caf1a56d 100644 --- a/staging/src/k8s.io/apiserver/go.sum +++ b/staging/src/k8s.io/apiserver/go.sum @@ -382,6 +382,7 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM= diff --git a/staging/src/k8s.io/apiserver/pkg/reconcilers/peer_endpoint_lease.go b/staging/src/k8s.io/apiserver/pkg/reconcilers/peer_endpoint_lease.go new file mode 100644 index 00000000000..4da16d4f142 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/reconcilers/peer_endpoint_lease.go @@ -0,0 +1,364 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconcilers + +import ( + "context" + "fmt" + "net" + "net/http" + "path" + "strconv" + "sync" + "sync/atomic" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storage" + "k8s.io/apiserver/pkg/storage/storagebackend" + storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" +) + +const ( + APIServerIdentityLabel = "apiserverIdentity" +) + +type PeerAdvertiseAddress struct { + PeerAdvertiseIP string + PeerAdvertisePort string +} + +type peerEndpointLeases struct { + storage storage.Interface + destroyFn func() + baseKey string + leaseTime time.Duration +} + +type PeerEndpointLeaseReconciler interface { + // GetEndpoint retrieves the endpoint for a given apiserverId + GetEndpoint(serverId string) (string, error) + // UpdateLease updates the ip and port of peer servers + UpdateLease(serverId string, ip string, endpointPorts []corev1.EndpointPort) error + // RemoveEndpoints removes this apiserver's peer endpoint lease. + RemoveLease(serverId string) error + // Destroy cleans up everything on shutdown. + Destroy() + // StopReconciling turns any later ReconcileEndpoints call into a noop. + StopReconciling() +} + +type peerEndpointLeaseReconciler struct { + serverLeases *peerEndpointLeases + stopReconcilingCalled atomic.Bool +} + +// NewPeerEndpointLeaseReconciler creates a new peer endpoint lease reconciler +func NewPeerEndpointLeaseReconciler(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (PeerEndpointLeaseReconciler, error) { + leaseStorage, destroyFn, err := storagefactory.Create(*config, nil) + if err != nil { + return nil, fmt.Errorf("error creating storage factory: %v", err) + } + var once sync.Once + return &peerEndpointLeaseReconciler{ + serverLeases: &peerEndpointLeases{ + storage: leaseStorage, + destroyFn: func() { once.Do(destroyFn) }, + baseKey: baseKey, + leaseTime: leaseTime, + }, + }, nil +} + +// PeerEndpointController is the controller manager for updating the peer endpoint leases. +// This provides a separate independent reconciliation loop for peer endpoint leases +// which ensures that the peer kube-apiservers are fetching the updated endpoint info for a given apiserver +// in the case when the peer wants to proxy the request to the given apiserver because it can not serve the +// request itself due to version mismatch. +type PeerEndpointLeaseController struct { + reconciler PeerEndpointLeaseReconciler + endpointInterval time.Duration + serverId string + // peeraddress stores the IP and port of this kube-apiserver. Used by peer kube-apiservers to + // route request to this apiserver in case of a version skew. + peeraddress string + + client kubernetes.Interface + + lock sync.Mutex + stopCh chan struct{} // closed by Stop() +} + +func New(serverId string, peeraddress string, + reconciler PeerEndpointLeaseReconciler, endpointInterval time.Duration, client kubernetes.Interface) *PeerEndpointLeaseController { + return &PeerEndpointLeaseController{ + reconciler: reconciler, + serverId: serverId, + // peeraddress stores the IP and port of this kube-apiserver. Used by peer kube-apiservers to + // route request to this apiserver in case of a version skew. + peeraddress: peeraddress, + endpointInterval: endpointInterval, + client: client, + stopCh: make(chan struct{}), + } +} + +// Start begins the peer endpoint lease reconciler loop that must exist for bootstrapping +// a cluster. +func (c *PeerEndpointLeaseController) Start(stopCh <-chan struct{}) { + localStopCh := make(chan struct{}) + go func() { + defer close(localStopCh) + select { + case <-stopCh: // from Start + case <-c.stopCh: // from Stop + } + }() + go c.Run(localStopCh) +} + +// RunPeerEndpointReconciler periodically updates the peer endpoint leases +func (c *PeerEndpointLeaseController) Run(stopCh <-chan struct{}) { + // wait until process is ready + wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { + var code int + c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code) + return code == http.StatusOK, nil + }, stopCh) + + wait.NonSlidingUntil(func() { + if err := c.UpdatePeerEndpointLeases(); err != nil { + runtime.HandleError(fmt.Errorf("unable to update peer endpoint leases: %v", err)) + } + }, c.endpointInterval, stopCh) +} + +// Stop cleans up this apiserver's peer endpoint leases. +func (c *PeerEndpointLeaseController) Stop() { + c.lock.Lock() + defer c.lock.Unlock() + + select { + case <-c.stopCh: + return // only close once + default: + close(c.stopCh) + } + finishedReconciling := make(chan struct{}) + go func() { + defer close(finishedReconciling) + klog.Infof("Shutting down peer endpoint lease reconciler") + // stop reconciliation + c.reconciler.StopReconciling() + + // Ensure that there will be no race condition with the ReconcileEndpointLeases. + if err := c.reconciler.RemoveLease(c.serverId); err != nil { + klog.Errorf("Unable to remove peer endpoint leases: %v", err) + } + c.reconciler.Destroy() + }() + + select { + case <-finishedReconciling: + // done + case <-time.After(2 * c.endpointInterval): + // don't block server shutdown forever if we can't reach etcd to remove ourselves + klog.Warning("peer_endpoint_controller's RemoveEndpoints() timed out") + } +} + +// UpdatePeerEndpointLeases attempts to update the peer endpoint leases. +func (c *PeerEndpointLeaseController) UpdatePeerEndpointLeases() error { + host, port, err := net.SplitHostPort(c.peeraddress) + if err != nil { + return err + } + + p, err := strconv.Atoi(port) + if err != nil { + return err + } + endpointPorts := createEndpointPortSpec(p, "https") + + // Ensure that there will be no race condition with the RemoveEndpointLeases. + c.lock.Lock() + defer c.lock.Unlock() + + // Refresh the TTL on our key, independently of whether any error or + // update conflict happens below. This makes sure that at least some of + // the servers will add our endpoint lease. + if err := c.reconciler.UpdateLease(c.serverId, host, endpointPorts); err != nil { + return err + } + return nil +} + +// UpdateLease resets the TTL on a server IP in storage +// UpdateLease will create a new key if it doesn't exist. +// We use the first element in endpointPorts as a part of the lease's base key +// This is done to support out tests that simulate 2 apiservers running on the same ip but +// different ports + +// It will also do the following if UnknownVersionInteroperabilityProxy feature is enabled +// 1. store the apiserverId as a label +// 2. store the values passed to --peer-advertise-ip and --peer-advertise-port flags to kube-apiserver as an annotation +// with value of format +func (r *peerEndpointLeaseReconciler) UpdateLease(serverId string, ip string, endpointPorts []corev1.EndpointPort) error { + // reconcile endpoints only if apiserver was not shutdown + if r.stopReconcilingCalled.Load() { + return nil + } + + // we use the serverID as the key to avoid using the server IP, port as the key. + // note: this means that this lease doesn't enforce mutual exclusion of ip/port usage between apiserver. + key := path.Join(r.serverLeases.baseKey, serverId) + return r.serverLeases.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) { + existing := input.(*corev1.Endpoints) + existing.Subsets = []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: ip}}, + Ports: endpointPorts, + }, + } + + // store this server's identity (serverId) as a label. This will be used by + // peers to find the IP of this server when the peer can not serve a request + // due to version skew. + if existing.Labels == nil { + existing.Labels = map[string]string{} + } + existing.Labels[APIServerIdentityLabel] = serverId + + // leaseTime needs to be in seconds + leaseTime := uint64(r.serverLeases.leaseTime / time.Second) + + // NB: GuaranteedUpdate does not perform the store operation unless + // something changed between load and store (not including resource + // version), meaning we can't refresh the TTL without actually + // changing a field. + existing.Generation++ + + klog.V(6).Infof("Resetting TTL on server IP %q listed in storage to %v", ip, leaseTime) + return existing, &leaseTime, nil + }, nil) +} + +// ListLeases retrieves a list of the current server IPs from storage +func (r *peerEndpointLeaseReconciler) ListLeases() ([]string, error) { + storageOpts := storage.ListOptions{ + ResourceVersion: "0", + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + Predicate: storage.Everything, + Recursive: true, + } + ipInfoList, err := r.getIpInfoList(storageOpts) + if err != nil { + return nil, err + } + ipList := make([]string, 0, len(ipInfoList.Items)) + for _, ip := range ipInfoList.Items { + if len(ip.Subsets) > 0 && len(ip.Subsets[0].Addresses) > 0 && len(ip.Subsets[0].Addresses[0].IP) > 0 { + ipList = append(ipList, ip.Subsets[0].Addresses[0].IP) + } + } + klog.V(6).Infof("Current server IPs listed in storage are %v", ipList) + return ipList, nil +} + +// GetLease retrieves the server IP and port for a specific server id +func (r *peerEndpointLeaseReconciler) GetLease(serverId string) (string, error) { + var fullAddr string + if serverId == "" { + return "", fmt.Errorf("error getting endpoint for serverId: empty serverId") + } + storageOpts := storage.ListOptions{ + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + Predicate: storage.Everything, + Recursive: true, + } + ipInfoList, err := r.getIpInfoList(storageOpts) + if err != nil { + return "", err + } + + for _, ip := range ipInfoList.Items { + if ip.Labels[APIServerIdentityLabel] == serverId { + if len(ip.Subsets) > 0 { + var ipStr, portStr string + if len(ip.Subsets[0].Addresses) > 0 { + if len(ip.Subsets[0].Addresses[0].IP) > 0 { + ipStr = ip.Subsets[0].Addresses[0].IP + } + } + if len(ip.Subsets[0].Ports) > 0 { + portStr = fmt.Sprint(ip.Subsets[0].Ports[0].Port) + } + fullAddr = net.JoinHostPort(ipStr, portStr) + break + } + } + } + klog.V(6).Infof("Fetched this server IP for the specified apiserverId %v, %v", serverId, fullAddr) + return fullAddr, nil +} + +func (r *peerEndpointLeaseReconciler) StopReconciling() { + r.stopReconcilingCalled.Store(true) +} + +// RemoveLease removes the lease on a server IP in storage +// We use the first element in endpointPorts as a part of the lease's base key +// This is done to support out tests that simulate 2 apiservers running on the same ip but +// different ports +func (r *peerEndpointLeaseReconciler) RemoveLease(serverId string) error { + key := path.Join(r.serverLeases.baseKey, serverId) + return r.serverLeases.storage.Delete(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil) +} + +func (r *peerEndpointLeaseReconciler) Destroy() { + r.serverLeases.destroyFn() +} + +func (r *peerEndpointLeaseReconciler) GetEndpoint(serverId string) (string, error) { + return r.GetLease(serverId) +} + +func (r *peerEndpointLeaseReconciler) getIpInfoList(storageOpts storage.ListOptions) (*corev1.EndpointsList, error) { + ipInfoList := &corev1.EndpointsList{} + if err := r.serverLeases.storage.GetList(apirequest.NewDefaultContext(), r.serverLeases.baseKey, storageOpts, ipInfoList); err != nil { + return nil, err + } + return ipInfoList, nil +} + +// createEndpointPortSpec creates the endpoint ports +func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.EndpointPort { + return []corev1.EndpointPort{{ + Protocol: corev1.ProtocolTCP, + Port: int32(endpointPort), + Name: endpointPortName, + }} +} diff --git a/staging/src/k8s.io/apiserver/pkg/reconcilers/peer_endpoint_lease_test.go b/staging/src/k8s.io/apiserver/pkg/reconcilers/peer_endpoint_lease_test.go new file mode 100644 index 00000000000..07e5f1e9c1e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/reconcilers/peer_endpoint_lease_test.go @@ -0,0 +1,278 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconcilers + +import ( + "reflect" + "sort" + "testing" + "time" + + "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/apitesting" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storage" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" +) + +func init() { + var scheme = runtime.NewScheme() + + metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) + utilruntime.Must(corev1.AddToScheme(scheme)) + utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion)) + + codecs = serializer.NewCodecFactory(scheme) +} + +var codecs serializer.CodecFactory + +type serverInfo struct { + existingIP string + id string + ports []corev1.EndpointPort + newIP string + removeLease bool + expectEndpoint string +} + +func NewFakePeerEndpointReconciler(t *testing.T, s storage.Interface) peerEndpointLeaseReconciler { + // use the same base key used by the controlplane, but add a random + // prefix so we can reuse the etcd instance for subtests independently. + base := "/" + uuid.New().String() + "/peerserverleases/" + return peerEndpointLeaseReconciler{serverLeases: &peerEndpointLeases{ + storage: s, + destroyFn: func() {}, + baseKey: base, + leaseTime: 1 * time.Minute, // avoid the lease to timeout on tests + }} +} + +func (f *peerEndpointLeaseReconciler) SetKeys(servers []serverInfo) error { + for _, server := range servers { + if err := f.UpdateLease(server.id, server.existingIP, server.ports); err != nil { + return err + } + } + return nil +} + +func TestPeerEndpointLeaseReconciler(t *testing.T) { + // enable feature flags + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + + server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + t.Cleanup(func() { server.Terminate(t) }) + + newFunc := func() runtime.Object { return &corev1.Endpoints{} } + sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) + + s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc) + if err != nil { + t.Fatalf("Error creating storage: %v", err) + } + t.Cleanup(dFunc) + + tests := []struct { + testName string + servers []serverInfo + expectLeases []string + }{ + { + testName: "existing IP satisfy", + servers: []serverInfo{{ + existingIP: "4.3.2.1", + id: "server-1", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + expectEndpoint: "4.3.2.1:8080", + }, { + existingIP: "1.2.3.4", + id: "server-2", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + expectEndpoint: "1.2.3.4:8080", + }}, + expectLeases: []string{"4.3.2.1", "1.2.3.4"}, + }, + { + testName: "existing IP + new IP = should return the new IP", + servers: []serverInfo{{ + existingIP: "4.3.2.2", + id: "server-1", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + newIP: "4.3.2.1", + expectEndpoint: "4.3.2.1:8080", + }, { + existingIP: "1.2.3.4", + id: "server-2", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + newIP: "1.1.1.1", + expectEndpoint: "1.1.1.1:8080", + }}, + expectLeases: []string{"4.3.2.1", "1.1.1.1"}, + }, + { + testName: "no existing IP, should return new IP", + servers: []serverInfo{{ + id: "server-1", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + newIP: "1.2.3.4", + expectEndpoint: "1.2.3.4:8080", + }}, + expectLeases: []string{"1.2.3.4"}, + }, + } + for _, test := range tests { + t.Run(test.testName, func(t *testing.T) { + fakeReconciler := NewFakePeerEndpointReconciler(t, s) + err := fakeReconciler.SetKeys(test.servers) + if err != nil { + t.Errorf("unexpected error creating keys: %v", err) + } + + for _, server := range test.servers { + if server.newIP != "" { + err = fakeReconciler.UpdateLease(server.id, server.newIP, server.ports) + if err != nil { + t.Errorf("unexpected error reconciling: %v", err) + } + } + } + + leases, err := fakeReconciler.ListLeases() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort for comparison + sort.Strings(leases) + sort.Strings(test.expectLeases) + if !reflect.DeepEqual(leases, test.expectLeases) { + t.Errorf("expected %v got: %v", test.expectLeases, leases) + } + + for _, server := range test.servers { + endpoint, err := fakeReconciler.GetLease(server.id) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if endpoint != server.expectEndpoint { + t.Errorf("expected %v got: %v", server.expectEndpoint, endpoint) + } + } + }) + } +} + +func TestPeerLeaseRemoveEndpoints(t *testing.T) { + // enable feature flags + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + + server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + t.Cleanup(func() { server.Terminate(t) }) + + newFunc := func() runtime.Object { return &corev1.Endpoints{} } + sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) + + s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc) + if err != nil { + t.Fatalf("Error creating storage: %v", err) + } + t.Cleanup(dFunc) + + stopTests := []struct { + testName string + servers []serverInfo + expectLeases []string + apiServerStartup bool + }{ + { + testName: "successful remove previous endpoints before apiserver starts", + servers: []serverInfo{ + { + existingIP: "1.2.3.4", + id: "test-server-1", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + removeLease: true, + }, + { + existingIP: "2.4.6.8", + id: "test-server-2", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + expectLeases: []string{"2.4.6.8"}, + apiServerStartup: true, + }, + { + testName: "stop reconciling with new IP not in existing ip list", + servers: []serverInfo{{ + existingIP: "1.2.3.4", + newIP: "4.6.8.9", + id: "test-server-1", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }, + { + existingIP: "2.4.6.8", + id: "test-server-2", + ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + removeLease: true, + }}, + expectLeases: []string{"1.2.3.4"}, + }, + } + for _, test := range stopTests { + t.Run(test.testName, func(t *testing.T) { + fakeReconciler := NewFakePeerEndpointReconciler(t, s) + err := fakeReconciler.SetKeys(test.servers) + if err != nil { + t.Errorf("unexpected error creating keys: %v", err) + } + if !test.apiServerStartup { + fakeReconciler.StopReconciling() + } + for _, server := range test.servers { + if server.removeLease { + err = fakeReconciler.RemoveLease(server.id) + // if the ip is not on the endpoints, it must return an storage error and stop reconciling + if err != nil { + t.Errorf("unexpected error reconciling: %v", err) + } + } + } + + leases, err := fakeReconciler.ListLeases() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort for comparison + sort.Strings(leases) + sort.Strings(test.expectLeases) + if !reflect.DeepEqual(leases, test.expectLeases) { + t.Errorf("expected %v got: %v", test.expectLeases, leases) + } + }) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index c9d449572c3..d678f52dfb7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -86,6 +86,13 @@ import ( _ "k8s.io/apiserver/pkg/apis/apiserver/install" ) +// hostnameFunc is a function to set the hostnameFunc of this apiserver. +// To be used for testing purpose only, to simulate scenarios where multiple apiservers +// exist. In such cases we want to ensure unique apiserver IDs which are a hash of hostnameFunc. +var ( + hostnameFunc = os.Hostname +) + const ( // DefaultLegacyAPIPrefix is where the legacy APIs will be located. DefaultLegacyAPIPrefix = "/api" @@ -367,7 +374,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config { defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz} var id string if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { - hostname, err := os.Hostname() + hostname, err := hostnameFunc() if err != nil { klog.Fatalf("error getting hostname for apiserver identity: %v", err) } @@ -897,7 +904,9 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c } func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { - handler := filterlatency.TrackCompleted(apiHandler) + handler := apiHandler + + handler = filterlatency.TrackCompleted(handler) handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer) handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization") @@ -1070,3 +1079,12 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens, authn.APIAudiences) authn.Authenticator = authenticatorunion.New(tokenAuthenticator, authn.Authenticator) } + +// For testing purpose only +func SetHostnameFuncForTests(name string) { + hostnameFunc = func() (host string, err error) { + host = name + err = nil + return + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go b/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go index d30f74b9c41..ad1eb2835ef 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/server/routes/metrics.go @@ -22,6 +22,7 @@ import ( cachermetrics "k8s.io/apiserver/pkg/storage/cacher/metrics" etcd3metrics "k8s.io/apiserver/pkg/storage/etcd3/metrics" flowcontrolmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + peerproxymetrics "k8s.io/apiserver/pkg/util/peerproxy/metrics" "k8s.io/component-base/metrics/legacyregistry" ) @@ -50,4 +51,5 @@ func register() { cachermetrics.Register() etcd3metrics.Register() flowcontrolmetrics.Register() + peerproxymetrics.Register() } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go new file mode 100644 index 00000000000..48b89be75ff --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go @@ -0,0 +1,56 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + subsystem = "apiserver" + statuscode = "code" +) + +var registerMetricsOnce sync.Once + +var ( + // peerProxiedRequestsTotal counts the number of requests that were proxied to a peer kube-apiserver. + peerProxiedRequestsTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: subsystem, + Name: "rerouted_request_total", + Help: "Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it", + StabilityLevel: metrics.ALPHA, + }, + []string{statuscode}, + ) +) + +func Register() { + registerMetricsOnce.Do(func() { + legacyregistry.MustRegister(peerProxiedRequestsTotal) + }) +} + +// IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver +func IncPeerProxiedRequest(ctx context.Context, status string) { + peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go new file mode 100644 index 00000000000..7ea4d5c4b47 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go @@ -0,0 +1,67 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "net/http" + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/reconcilers" + "k8s.io/apiserver/pkg/storageversion" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" +) + +// Interface defines how the Unknown Version Proxy filter interacts with the underlying system. +type Interface interface { + WrapHandler(handler http.Handler) http.Handler + WaitForCacheSync(stopCh <-chan struct{}) error + HasFinishedSync() bool +} + +// New creates a new instance to implement unknown version proxy +func NewPeerProxyHandler(informerFactory kubeinformers.SharedInformerFactory, + svm storageversion.Manager, + proxyTransport http.RoundTripper, + serverId string, + reconciler reconcilers.PeerEndpointLeaseReconciler, + serializer runtime.NegotiatedSerializer) *peerProxyHandler { + h := &peerProxyHandler{ + name: "PeerProxyHandler", + storageversionManager: svm, + proxyTransport: proxyTransport, + svMap: sync.Map{}, + serverId: serverId, + reconciler: reconciler, + serializer: serializer, + } + svi := informerFactory.Internal().V1alpha1().StorageVersions() + h.storageversionInformer = svi.Informer() + + svi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + h.addSV(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + h.updateSV(oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + h.deleteSV(obj) + }}) + return h +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go new file mode 100644 index 00000000000..bc342165b21 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go @@ -0,0 +1,357 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "context" + "errors" + "fmt" + "math/rand" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "sync/atomic" + + "k8s.io/api/apiserverinternal/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + schema "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/endpoints/responsewriter" + "k8s.io/apiserver/pkg/reconcilers" + "k8s.io/apiserver/pkg/storageversion" + "k8s.io/apiserver/pkg/util/peerproxy/metrics" + apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/transport" + "k8s.io/klog/v2" +) + +const ( + PeerProxiedHeader = "x-kubernetes-peer-proxied" +) + +type peerProxyHandler struct { + name string + // StorageVersion informer used to fetch apiserver ids than can serve a resource + storageversionInformer cache.SharedIndexInformer + + // StorageVersion manager used to ensure it has finished updating storageversions before + // we start handling external requests + storageversionManager storageversion.Manager + + // proxy transport + proxyTransport http.RoundTripper + + // identity for this server + serverId string + + // reconciler that is used to fetch host port of peer apiserver when proxying request to a peer + reconciler reconcilers.PeerEndpointLeaseReconciler + + serializer runtime.NegotiatedSerializer + + // SyncMap for storing an up to date copy of the storageversions and apiservers that can serve them + // This map is populated using the StorageVersion informer + // This map has key set to GVR and value being another SyncMap + // The nested SyncMap has key set to apiserver id and value set to boolean + // The nested maps are created to have a "Set" like structure to store unique apiserver ids + // for a given GVR + svMap sync.Map + + finishedSync atomic.Bool +} + +type serviceableByResponse struct { + locallyServiceable bool + errorFetchingAddressFromLease bool + peerEndpoints []string +} + +// responder implements rest.Responder for assisting a connector in writing objects or errors. +type responder struct { + w http.ResponseWriter + ctx context.Context +} + +func (h *peerProxyHandler) HasFinishedSync() bool { + return h.finishedSync.Load() +} + +func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error { + + ok := cache.WaitForNamedCacheSync("unknown-version-proxy", stopCh, h.storageversionInformer.HasSynced, h.storageversionManager.Completed) + if !ok { + return fmt.Errorf("error while waiting for initial cache sync") + } + klog.V(3).Infof("setting finishedSync to true") + h.finishedSync.Store(true) + return nil +} + +// WrapHandler will fetch the apiservers that can serve the request and either serve it locally +// or route it to a peer +func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + requestInfo, ok := apirequest.RequestInfoFrom(ctx) + + if !ok { + responsewriters.InternalError(w, r, errors.New("no RequestInfo found in the context")) + return + } + + // Allow non-resource requests + if !requestInfo.IsResourceRequest { + klog.V(3).Infof("Not a resource request skipping proxying") + handler.ServeHTTP(w, r) + return + } + + // Request has already been proxied once, it must be served locally + if r.Header.Get(PeerProxiedHeader) == "true" { + klog.V(3).Infof("Already rerouted once, skipping proxying to peer") + handler.ServeHTTP(w, r) + return + } + + // StorageVersion Informers and/or StorageVersionManager is not synced yet, pass request to next handler + // This will happen for self requests from the kube-apiserver because we have a poststarthook + // to ensure that external requests are not served until the StorageVersion Informer and + // StorageVersionManager has synced + if !h.HasFinishedSync() { + handler.ServeHTTP(w, r) + return + } + + gvr := schema.GroupVersionResource{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion, Resource: requestInfo.Resource} + if requestInfo.APIGroup == "" { + gvr.Group = "core" + } + + // find servers that are capable of serving this request + serviceableByResp, err := h.findServiceableByServers(gvr, h.serverId, h.reconciler) + if err != nil { + // this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is + handler.ServeHTTP(w, r) + return + } + // found the gvr locally, pass request to the next handler in local apiserver + if serviceableByResp.locallyServiceable { + handler.ServeHTTP(w, r) + return + } + + gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} + + if serviceableByResp.errorFetchingAddressFromLease { + klog.ErrorS(err, "error fetching ip and port of remote server while proxying") + responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r) + return + } + + // no apiservers were found that could serve the request, pass request to + // next handler, that should eventually serve 404 + + // TODO: maintain locally serviceable GVRs somewhere so that we dont have to + // consult the storageversion-informed map for those + if len(serviceableByResp.peerEndpoints) == 0 { + klog.Errorf(fmt.Sprintf("GVR %v is not served by anything in this cluster", gvr)) + handler.ServeHTTP(w, r) + return + } + + // otherwise, randomly select an apiserver and proxy request to it + rand := rand.Intn(len(serviceableByResp.peerEndpoints)) + destServerHostPort := serviceableByResp.peerEndpoints[rand] + h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort) + + }) +} + +func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource, localAPIServerId string, reconciler reconcilers.PeerEndpointLeaseReconciler) (serviceableByResponse, error) { + + apiserversi, ok := h.svMap.Load(gvr) + + // no value found for the requested gvr in svMap + if !ok || apiserversi == nil { + return serviceableByResponse{}, fmt.Errorf("no StorageVersions found for the GVR: %v", gvr) + } + apiservers := apiserversi.(*sync.Map) + response := serviceableByResponse{} + var peerServerEndpoints []string + apiservers.Range(func(key, value interface{}) bool { + apiserverKey := key.(string) + if apiserverKey == localAPIServerId { + response.errorFetchingAddressFromLease = true + response.locallyServiceable = true + // stop iteration + return false + } + + hostPort, err := reconciler.GetEndpoint(apiserverKey) + if err != nil { + response.errorFetchingAddressFromLease = true + klog.Errorf("failed to get peer ip from storage lease for server %s", apiserverKey) + // continue with iteration + return true + } + // check ip format + _, _, err = net.SplitHostPort(hostPort) + if err != nil { + response.errorFetchingAddressFromLease = true + klog.Errorf("invalid address found for server %s", apiserverKey) + // continue with iteration + return true + } + peerServerEndpoints = append(peerServerEndpoints, hostPort) + // continue with iteration + return true + }) + + response.peerEndpoints = peerServerEndpoints + return response, nil +} + +func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) { + user, ok := apirequest.UserFrom(req.Context()) + if !ok { + klog.Errorf("failed to get user info from request") + return + } + + // write a new location based on the existing request pointed at the target service + location := &url.URL{} + location.Scheme = "https" + location.Host = host + location.Path = req.URL.Path + location.RawQuery = req.URL.Query().Encode() + + newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, req) + newReq.Header.Add(PeerProxiedHeader, "true") + defer cancelFn() + + proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), h.proxyTransport) + + delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw} + w := responsewriter.WrapForHTTP1Or2(delegate) + + handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()}) + handler.ServeHTTP(w, newReq) + // Increment the count of proxied requests + metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status())) +} + +func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { + klog.Errorf("Error while proxying request to destination apiserver: %v", err) + http.Error(w, err.Error(), http.StatusServiceUnavailable) +} + +// Adds a storageversion object to SVMap +func (h *peerProxyHandler) addSV(obj interface{}) { + sv, ok := obj.(*v1alpha1.StorageVersion) + if !ok { + klog.Errorf("Invalid StorageVersion provided to addSV()") + return + } + h.updateSVMap(nil, sv) +} + +// Updates the SVMap to delete old storageversion and add new storageversion +func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) { + oldSV, ok := oldObj.(*v1alpha1.StorageVersion) + if !ok { + klog.Errorf("Invalid StorageVersion provided to updateSV()") + return + } + newSV, ok := newObj.(*v1alpha1.StorageVersion) + if !ok { + klog.Errorf("Invalid StorageVersion provided to updateSV()") + return + } + h.updateSVMap(oldSV, newSV) +} + +// Deletes a storageversion object from SVMap +func (h *peerProxyHandler) deleteSV(obj interface{}) { + sv, ok := obj.(*v1alpha1.StorageVersion) + if !ok { + klog.Errorf("Invalid StorageVersion provided to deleteSV()") + return + } + h.updateSVMap(sv, nil) +} + +// Delete old storageversion, add new storagversion +func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) { + if oldSV != nil { + // delete old SV entries + h.deleteSVFromMap(oldSV) + } + if newSV != nil { + // add new SV entries + h.addSVToMap(newSV) + } +} + +func (h *peerProxyHandler) deleteSVFromMap(sv *v1alpha1.StorageVersion) { + // The name of storageversion is . + splitInd := strings.LastIndex(sv.Name, ".") + group := sv.Name[:splitInd] + resource := sv.Name[splitInd+1:] + + gvr := schema.GroupVersionResource{Group: group, Resource: resource} + for _, gr := range sv.Status.StorageVersions { + for _, version := range gr.ServedVersions { + versionSplit := strings.Split(version, "/") + if len(versionSplit) == 2 { + version = versionSplit[1] + } + gvr.Version = version + h.svMap.Delete(gvr) + } + } +} + +func (h *peerProxyHandler) addSVToMap(sv *v1alpha1.StorageVersion) { + // The name of storageversion is . + splitInd := strings.LastIndex(sv.Name, ".") + group := sv.Name[:splitInd] + resource := sv.Name[splitInd+1:] + + gvr := schema.GroupVersionResource{Group: group, Resource: resource} + for _, gr := range sv.Status.StorageVersions { + for _, version := range gr.ServedVersions { + + // some versions have groups included in them, so get rid of the groups + versionSplit := strings.Split(version, "/") + if len(versionSplit) == 2 { + version = versionSplit[1] + } + gvr.Version = version + apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) + apiservers := apiserversi.(*sync.Map) + apiservers.Store(gr.APIServerID, true) + } + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go new file mode 100644 index 00000000000..9b1cc9b5eb8 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go @@ -0,0 +1,329 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "net/http" + "strings" + "sync" + "testing" + "time" + + "net/http/httptest" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/apitesting" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/authentication/user" + apifilters "k8s.io/apiserver/pkg/endpoints/filters" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/reconcilers" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storageversion" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/apiserver/pkg/util/peerproxy/metrics" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/transport" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/component-base/metrics/legacyregistry" + "k8s.io/component-base/metrics/testutil" +) + +const ( + requestTimeout = 30 * time.Second + localServerId = "local-apiserver" + remoteServerId = "remote-apiserver" +) + +type FakeSVMapData struct { + gvr schema.GroupVersionResource + serverId string +} + +type reconciler struct { + do bool + publicIP string + serverId string +} + +func TestPeerProxy(t *testing.T) { + testCases := []struct { + desc string + svdata FakeSVMapData + informerFinishedSync bool + requestPath string + peerproxiedHeader string + expectedStatus int + metrics []string + want string + reconcilerConfig reconciler + }{ + { + desc: "allow non resource requests", + requestPath: "/foo/bar/baz", + expectedStatus: http.StatusOK, + }, + { + desc: "allow if already proxied once", + requestPath: "/api/bar/baz", + expectedStatus: http.StatusOK, + peerproxiedHeader: "true", + }, + { + desc: "allow if unsynced informers", + requestPath: "/api/bar/baz", + expectedStatus: http.StatusOK, + informerFinishedSync: false, + }, + { + desc: "allow if no storage version found", + requestPath: "/api/bar/baz", + expectedStatus: http.StatusOK, + informerFinishedSync: true, + }, + { + // since if no server id is found, we pass request to next handler + //, and our last handler in local chain is an http ok handler + desc: "200 if no serverid found", + requestPath: "/api/bar/baz", + expectedStatus: http.StatusOK, + informerFinishedSync: true, + svdata: FakeSVMapData{ + gvr: schema.GroupVersionResource{ + Group: "core", + Version: "bar", + Resource: "baz"}, + serverId: ""}, + }, + { + desc: "503 if no endpoint fetched from lease", + requestPath: "/api/foo/bar", + expectedStatus: http.StatusServiceUnavailable, + informerFinishedSync: true, + svdata: FakeSVMapData{ + gvr: schema.GroupVersionResource{ + Group: "core", + Version: "foo", + Resource: "bar"}, + serverId: remoteServerId}, + }, + { + desc: "200 if locally serviceable", + requestPath: "/api/foo/bar", + expectedStatus: http.StatusOK, + informerFinishedSync: true, + svdata: FakeSVMapData{ + gvr: schema.GroupVersionResource{ + Group: "core", + Version: "foo", + Resource: "bar"}, + serverId: localServerId}, + }, + { + desc: "503 unreachable peer bind address", + requestPath: "/api/foo/bar", + expectedStatus: http.StatusServiceUnavailable, + informerFinishedSync: true, + svdata: FakeSVMapData{ + gvr: schema.GroupVersionResource{ + Group: "core", + Version: "foo", + Resource: "bar"}, + serverId: remoteServerId}, + reconcilerConfig: reconciler{ + do: true, + publicIP: "1.2.3.4", + serverId: remoteServerId, + }, + metrics: []string{ + "apiserver_rerouted_request_total", + }, + want: ` + # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it + # TYPE apiserver_rerouted_request_total counter + apiserver_rerouted_request_total{code="503"} 1 + `, + }, + { + desc: "503 unreachable peer public address", + requestPath: "/api/foo/bar", + expectedStatus: http.StatusServiceUnavailable, + informerFinishedSync: true, + svdata: FakeSVMapData{ + gvr: schema.GroupVersionResource{ + Group: "core", + Version: "foo", + Resource: "bar"}, + serverId: remoteServerId}, + reconcilerConfig: reconciler{ + do: true, + publicIP: "1.2.3.4", + serverId: remoteServerId, + }, + metrics: []string{ + "apiserver_rerouted_request_total", + }, + want: ` + # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it + # TYPE apiserver_rerouted_request_total counter + apiserver_rerouted_request_total{code="503"} 2 + `, + }, + } + + metrics.Register() + for _, tt := range testCases { + t.Run(tt.desc, func(t *testing.T) { + lastHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte("OK")) + }) + reconciler := newFakePeerEndpointReconciler(t) + handler := newHandlerChain(t, lastHandler, reconciler, tt.informerFinishedSync, tt.svdata) + server, requestGetter := createHTTP2ServerWithClient(handler, requestTimeout*2) + defer server.Close() + + if tt.reconcilerConfig.do { + // need to enable feature flags first + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + + reconciler.UpdateLease(tt.reconcilerConfig.serverId, + tt.reconcilerConfig.publicIP, + []corev1.EndpointPort{{Name: "foo", + Port: 8080, Protocol: "TCP"}}) + } + + req, err := http.NewRequest(http.MethodGet, server.URL+tt.requestPath, nil) + if err != nil { + t.Fatalf("failed to create new http request - %v", err) + } + req.Header.Set(PeerProxiedHeader, tt.peerproxiedHeader) + + resp, _ := requestGetter(req) + + // compare response + assert.Equal(t, tt.expectedStatus, resp.StatusCode) + + // compare metric + if tt.want != "" { + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.want), tt.metrics...); err != nil { + t.Fatal(err) + } + } + }) + } + +} + +func newFakePeerEndpointReconciler(t *testing.T) reconcilers.PeerEndpointLeaseReconciler { + server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + t.Cleanup(func() { server.Terminate(t) }) + scheme := runtime.NewScheme() + metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) + //utilruntime.Must(core.AddToScheme(scheme)) + utilruntime.Must(corev1.AddToScheme(scheme)) + utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion)) + codecs := serializer.NewCodecFactory(scheme) + sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) + config := *sc.ForResource(schema.GroupResource{Resource: "endpoints"}) + baseKey := "/" + uuid.New().String() + "/peer-testleases/" + leaseTime := 1 * time.Minute + reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(&config, baseKey, leaseTime) + if err != nil { + t.Fatalf("Error creating storage: %v", err) + } + return reconciler +} + +func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler { + // Add peerproxy handler + s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() + peerProxyHandler, err := newFakePeerProxyHandler(informerFinishedSync, reconciler, svdata, localServerId, s) + if err != nil { + t.Fatalf("Error creating peer proxy handler: %v", err) + } + peerProxyHandler.finishedSync.Store(informerFinishedSync) + handler = peerProxyHandler.WrapHandler(handler) + + // Add user info + handler = withFakeUser(handler) + + // Add requestInfo handler + requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} + handler = apifilters.WithRequestInfo(handler, requestInfoFactory) + return handler +} + +func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) { + clientset := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(clientset, 0) + clientConfig := &transport.Config{ + TLS: transport.TLSConfig{ + Insecure: false, + }} + proxyRoundTripper, err := transport.New(clientConfig) + if err != nil { + return nil, err + } + ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s) + if testDataExists(svdata.gvr) { + ppI.addToStorageVersionMap(svdata.gvr, svdata.serverId) + } + return ppI, nil +} + +func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverId string) { + apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) + apiservers := apiserversi.(*sync.Map) + if serverId != "" { + apiservers.Store(serverId, true) + } +} + +func testDataExists(gvr schema.GroupVersionResource) bool { + return gvr.Group != "" && gvr.Version != "" && gvr.Resource != "" +} + +func withFakeUser(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ + Groups: r.Header["Groups"], + })) + handler.ServeHTTP(w, r) + }) +} + +// returns a started http2 server, with a client function to send request to the server. +func createHTTP2ServerWithClient(handler http.Handler, clientTimeout time.Duration) (*httptest.Server, func(req *http.Request) (*http.Response, error)) { + server := httptest.NewUnstartedServer(handler) + server.EnableHTTP2 = true + server.StartTLS() + cli := server.Client() + cli.Timeout = clientTimeout + return server, func(req *http.Request) (*http.Response, error) { + return cli.Do(req) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go index 8f6015cfaa0..9e0bdfaf03c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go +++ b/staging/src/k8s.io/apiserver/pkg/util/proxy/proxy.go @@ -17,17 +17,30 @@ limitations under the License. package proxy import ( + "context" "fmt" "math/rand" "net" + "net/http" "net/url" "strconv" + "strings" + "time" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + utilnet "k8s.io/apimachinery/pkg/util/net" + auditinternal "k8s.io/apiserver/pkg/apis/audit" + "k8s.io/apiserver/pkg/audit" + genericapirequest "k8s.io/apiserver/pkg/endpoints/request" listersv1 "k8s.io/client-go/listers/core/v1" ) +const ( + // taken from https://github.com/kubernetes/kubernetes/blob/release-1.27/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go#L47 + aggregatedDiscoveryTimeout = 5 * time.Second +) + // findServicePort finds the service port by name or numerically. func findServicePort(svc *v1.Service, port int32) (*v1.ServicePort, error) { for _, svcPort := range svc.Spec.Ports { @@ -117,3 +130,34 @@ func ResolveCluster(services listersv1.ServiceLister, namespace, id string, port return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type) } } + +// NewRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests +func NewRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) { + newCtx := req.Context() + cancelFn := func() {} + + if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok { + // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three + // segments that we are going to proxy, we have a discovery request. + if !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 { + // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that + // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) + // so forcing a short timeout here helps responsiveness of all clients. + newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) + } + } + + // WithContext creates a shallow clone of the request with the same context. + newReq := req.WithContext(newCtx) + newReq.Header = utilnet.CloneHeader(req.Header) + newReq.URL = location + newReq.Host = location.Host + + // If the original request has an audit ID, let's make sure we propagate this + // to the aggregated server. + if auditID, found := audit.AuditIDFrom(req.Context()); found { + newReq.Header.Set(auditinternal.HeaderAuditID, string(auditID)) + } + + return newReq, cancelFn +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index bc4da3a8946..73e818d261b 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/endpoints/discovery/aggregated" genericfeatures "k8s.io/apiserver/pkg/features" + peerreconcilers "k8s.io/apiserver/pkg/reconcilers" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/egressselector" @@ -76,6 +77,16 @@ const ( // ExtraConfig represents APIServices-specific configuration type ExtraConfig struct { + // PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers' + // serving certs when routing a request to the peer in the case the request can not be served + // locally due to version skew. + PeerCAFile string + + // PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request + // to this apiserver. This happens in cases where the peer is not able to serve the request due to + // version skew. If unset, AdvertiseAddress/BindAddress will be used. + PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress + // ProxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use // this to confirm the proxy's identity ProxyClientCertFile string diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go index 68b81cb7572..8a647a6451c 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go @@ -17,23 +17,18 @@ limitations under the License. package apiserver import ( - "context" "net/http" "net/url" - "strings" "sync/atomic" - "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/httpstream" - utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/proxy" - auditinternal "k8s.io/apiserver/pkg/apis/audit" - "k8s.io/apiserver/pkg/audit" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" "k8s.io/apiserver/pkg/util/x509metrics" "k8s.io/client-go/transport" "k8s.io/klog/v2" @@ -43,8 +38,6 @@ import ( const ( aggregatorComponent string = "aggregator" - - aggregatedDiscoveryTimeout = 5 * time.Second ) type certKeyFunc func() ([]byte, []byte) @@ -149,7 +142,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { location.Path = req.URL.Path location.RawQuery = req.URL.Query().Encode() - newReq, cancelFn := newRequestForProxy(location, req) + newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, req) defer cancelFn() if handlingInfo.proxyRoundTripper == nil { @@ -177,37 +170,6 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { handler.ServeHTTP(w, newReq) } -// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests -func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) { - newCtx := req.Context() - cancelFn := func() {} - - if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok { - // trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three - // segments that we are going to proxy, we have a discovery request. - if !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 { - // discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that - // should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions) - // so forcing a short timeout here helps responsiveness of all clients. - newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout) - } - } - - // WithContext creates a shallow clone of the request with the same context. - newReq := req.WithContext(newCtx) - newReq.Header = utilnet.CloneHeader(req.Header) - newReq.URL = location - newReq.Host = location.Host - - // If the original request has an audit ID, let's make sure we propagate this - // to the aggregated server. - if auditID, found := audit.AuditIDFrom(req.Context()); found { - newReq.Header.Set(auditinternal.HeaderAuditID, string(auditID)) - } - - return newReq, cancelFn -} - // responder implements rest.Responder for assisting a connector in writing objects or errors. type responder struct { w http.ResponseWriter diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go index 36780e53024..9372a3e0c92 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go @@ -49,6 +49,7 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/server/egressselector" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/legacyregistry" apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" @@ -747,7 +748,7 @@ func TestGetContextForNewRequest(t *testing.T) { location.Path = req.URL.Path nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path})) - newReq, cancelFn := newRequestForProxy(location, nestedReq) + newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, nestedReq) defer cancelFn() theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w}) @@ -802,7 +803,7 @@ func TestNewRequestForProxyWithAuditID(t *testing.T) { req = req.WithContext(ctx) } - newReq, _ := newRequestForProxy(req.URL, req) + newReq, _ := apiserverproxyutil.NewRequestForProxy(req.URL, req) if newReq == nil { t.Fatal("expected a non nil Request object") } diff --git a/test/integration/apiserver/peerproxy/main_test.go b/test/integration/apiserver/peerproxy/main_test.go new file mode 100644 index 00000000000..72bf12eeccc --- /dev/null +++ b/test/integration/apiserver/peerproxy/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/apiserver/peerproxy/peer_proxy_test.go b/test/integration/apiserver/peerproxy/peer_proxy_test.go new file mode 100644 index 00000000000..9dd66340bac --- /dev/null +++ b/test/integration/apiserver/peerproxy/peer_proxy_test.go @@ -0,0 +1,244 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/server" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/transport" + "k8s.io/client-go/util/cert" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" + kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/storageversiongc" + "k8s.io/kubernetes/pkg/controlplane" + kubefeatures "k8s.io/kubernetes/pkg/features" + + "k8s.io/kubernetes/test/integration/framework" + testutil "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/ktesting" +) + +func TestPeerProxiedRequest(t *testing.T) { + + ktesting.SetDefaultVerbosity(1) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + // ensure to stop cert reloading after shutdown + transport.DialerStopCh = ctx.Done() + + // enable feature flags + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)() + + // create sharedetcd + etcd := framework.SharedEtcd() + + // create certificates for aggregation and client-cert auth + proxyCA, err := createProxyCertContent() + require.NoError(t, err) + + // start test server with all APIs enabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-a") + serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{ + EnableCertAuth: true, + ProxyCA: &proxyCA}, + []string{}, etcd) + defer serverA.TearDownFn() + + // start another test server with some api disabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-b") + serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{ + EnableCertAuth: true, + ProxyCA: &proxyCA}, + []string{fmt.Sprintf("--runtime-config=%s", "batch/v1=false")}, etcd) + defer serverB.TearDownFn() + + kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) + require.NoError(t, err) + + kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) + require.NoError(t, err) + + // create jobs resource using serverA + job := createJobResource() + _, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{}) + require.NoError(t, err) + + klog.Infof("\nServerA has created jobs\n") + + // List jobs using ServerB + // This request should be proxied to ServerA since ServerB does not have batch API enabled + jobsB, err := kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{}) + klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items)) + require.NoError(t, err) + assert.NotEmpty(t, jobsB) + assert.Equal(t, job.Name, jobsB.Items[0].Name) +} + +func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { + + ktesting.SetDefaultVerbosity(1) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + // ensure to stop cert reloading after shutdown + transport.DialerStopCh = ctx.Done() + + // enable feature flags + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)() + + // create sharedetcd + etcd := framework.SharedEtcd() + + // create certificates for aggregation and client-cert auth + proxyCA, err := createProxyCertContent() + require.NoError(t, err) + + // set lease duration to 1s for serverA to ensure that storageversions for serverA are updated + // once it is shutdown + controlplane.IdentityLeaseDurationSeconds = 10 + controlplane.IdentityLeaseGCPeriod = time.Second + controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second + + // start serverA with all APIs enabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-a") + serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd) + kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) + require.NoError(t, err) + // ensure storageversion garbage collector ctlr is set up + informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second) + setupStorageVersionGC(ctx, kubeClientSetA, informersA) + // reset lease duration to default value for serverB and serverC since we will not be + // shutting these down + controlplane.IdentityLeaseDurationSeconds = 3600 + + // start serverB with some api disabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-b") + serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{ + fmt.Sprintf("--runtime-config=%v", "batch/v1=false")}, etcd) + defer serverB.TearDownFn() + kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) + require.NoError(t, err) + // ensure storageversion garbage collector ctlr is set up + informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second) + setupStorageVersionGC(ctx, kubeClientSetB, informersB) + + // start serverC with all APIs enabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-c") + serverC := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd) + defer serverC.TearDownFn() + + // create jobs resource using serverA + job := createJobResource() + _, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{}) + require.NoError(t, err) + klog.Infof("\nServerA has created jobs\n") + + // shutdown serverA + serverA.TearDownFn() + + var jobsB *v1.JobList + // list jobs using ServerB which it should proxy to ServerC and get back valid response + err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) { + jobsB, err = kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{}) + if err != nil { + return false, nil + } + if jobsB != nil { + return true, nil + } + return false, nil + }) + klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items)) + require.NoError(t, err) + assert.NotEmpty(t, jobsB) + assert.Equal(t, job.Name, jobsB.Items[0].Name) +} + +func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) { + leaseInformer := informers.Coordination().V1().Leases() + storageVersionInformer := informers.Internal().V1alpha1().StorageVersions() + go leaseInformer.Informer().Run(ctx.Done()) + go storageVersionInformer.Informer().Run(ctx.Done()) + + controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer) + go controller.Run(ctx) +} + +func createProxyCertContent() (kastesting.ProxyCA, error) { + result := kastesting.ProxyCA{} + proxySigningKey, err := testutil.NewPrivateKey() + if err != nil { + return result, err + } + proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) + if err != nil { + return result, err + } + + result = kastesting.ProxyCA{ + ProxySigningCert: proxySigningCert, + ProxySigningKey: proxySigningKey, + } + return result, nil +} + +func createJobResource() *v1.Job { + return &v1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Spec: v1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "test", + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } +} diff --git a/test/integration/controlplane/transformation/kms_transformation_test.go b/test/integration/controlplane/transformation/kms_transformation_test.go index f64978e6897..2bde85f284c 100644 --- a/test/integration/controlplane/transformation/kms_transformation_test.go +++ b/test/integration/controlplane/transformation/kms_transformation_test.go @@ -598,6 +598,7 @@ resources: // the following resources are not encrypted as they are not REST APIs and hence are not expected // to be encrypted because it would be impossible to perform a storage migration on them if strings.Contains(kv.String(), "masterleases") || + strings.Contains(kv.String(), "peerserverleases") || strings.Contains(kv.String(), "serviceips") || strings.Contains(kv.String(), "servicenodeports") { // assert that these resources are not encrypted with any provider diff --git a/vendor/modules.txt b/vendor/modules.txt index d2192848fdf..5da775893da 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1514,6 +1514,7 @@ k8s.io/apiserver/pkg/endpoints/warning k8s.io/apiserver/pkg/features k8s.io/apiserver/pkg/quota/v1 k8s.io/apiserver/pkg/quota/v1/generic +k8s.io/apiserver/pkg/reconcilers k8s.io/apiserver/pkg/registry/generic k8s.io/apiserver/pkg/registry/generic/registry k8s.io/apiserver/pkg/registry/generic/rest @@ -1574,6 +1575,8 @@ k8s.io/apiserver/pkg/util/flowcontrol/request k8s.io/apiserver/pkg/util/flushwriter k8s.io/apiserver/pkg/util/notfoundhandler k8s.io/apiserver/pkg/util/openapi +k8s.io/apiserver/pkg/util/peerproxy +k8s.io/apiserver/pkg/util/peerproxy/metrics k8s.io/apiserver/pkg/util/proxy k8s.io/apiserver/pkg/util/shufflesharding k8s.io/apiserver/pkg/util/webhook