Merge pull request #117740 from Richabanker/uvip-impl

Unknown Version Interoperability Proxy Impl
This commit is contained in:
Kubernetes Prow Robot 2023-07-18 18:36:02 -07:00 committed by GitHub
commit 66e99b3ff1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 2166 additions and 55 deletions

View File

@ -37,6 +37,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
kubeexternalinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
@ -57,6 +58,7 @@ func createAggregatorConfig(
externalInformers kubeexternalinformers.SharedInformerFactory,
serviceResolver aggregatorapiserver.ServiceResolver,
proxyTransport *http.Transport,
peerProxy utilpeerproxy.Interface,
pluginInitializers []admission.PluginInitializer,
) (*aggregatorapiserver.Config, error) {
// make a shallow copy to let us twiddle a few things
@ -76,6 +78,16 @@ func createAggregatorConfig(
genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
}
if peerProxy != nil {
originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
// Add peer proxy handler to aggregator-apiserver.
// wrap the peer proxy handler first.
apiHandler = peerProxy.WrapHandler(apiHandler)
return originalHandlerChainBuilder(apiHandler, c)
}
}
// copy the etcd options so we don't mutate originals.
// we assume that the etcd options have been completed already. avoid messing with anything outside
// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
@ -104,6 +116,8 @@ func createAggregatorConfig(
ExtraConfig: aggregatorapiserver.ExtraConfig{
ProxyClientCertFile: commandOptions.ProxyClientCertFile,
ProxyClientKeyFile: commandOptions.ProxyClientKeyFile,
PeerCAFile: commandOptions.PeerCAFile,
PeerAdvertiseAddress: commandOptions.PeerAdvertiseAddress,
ServiceResolver: serviceResolver,
ProxyTransport: proxyTransport,
RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,

View File

@ -84,7 +84,7 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
c.ApiExtensions = apiExtensions
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, pluginInitializer)
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}

View File

@ -57,6 +57,7 @@ import (
"k8s.io/klog/v2"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
@ -258,6 +259,21 @@ func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
},
}
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
config.ExtraConfig.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
if err != nil {
return nil, nil, nil, err
}
// build peer proxy config only if peer ca file exists
if opts.PeerCAFile != "" {
config.ExtraConfig.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ExtraConfig.PeerEndpointLeaseReconciler, config.GenericConfig.Serializer)
if err != nil {
return nil, nil, nil, err
}
}
}
clientCAProvider, err := opts.Authentication.ClientCert.GetClientCAContentProvider()
if err != nil {
return nil, nil, nil, err

View File

@ -18,6 +18,7 @@ package testing
import (
"context"
"crypto/rsa"
"crypto/x509"
"fmt"
"net"
@ -38,12 +39,15 @@ import (
serveroptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
clientgotransport "k8s.io/client-go/transport"
"k8s.io/client-go/util/cert"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
@ -77,6 +81,14 @@ type TestServerInstanceOptions struct {
EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server.
StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager
// CA file used for requestheader authn during communication between:
// 1. kube-apiserver and peer when the local apiserver is not able to serve the request due
// to version skew
// 2. kube-apiserver and aggregated apiserver
// We specify this as on option to pass a common proxyCA to multiple apiservers to simulate
// an apiserver version skew scenario where all apiservers use the same proxyCA to verify client connections.
ProxyCA *ProxyCA
}
// TestServer return values supplied by kube-test-ApiServer
@ -95,6 +107,16 @@ type Logger interface {
Errorf(format string, args ...interface{})
Fatalf(format string, args ...interface{})
Logf(format string, args ...interface{})
Cleanup(func())
}
// ProxyCA contains the certificate authority certificate and key which is used to verify client connections
// to kube-apiservers. The clients can be :
// 1. aggregated apiservers
// 2. peer kube-apiservers
type ProxyCA struct {
ProxySigningCert *x509.Certificate
ProxySigningKey *rsa.PrivateKey
}
// NewDefaultTestServerOptions Default options for TestServer instances
@ -161,14 +183,24 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
reqHeaders := serveroptions.NewDelegatingAuthenticationOptions()
s.Authentication.RequestHeader = &reqHeaders.RequestHeader
// create certificates for aggregation and client-cert auth
proxySigningKey, err := testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
var proxySigningKey *rsa.PrivateKey
var proxySigningCert *x509.Certificate
if instanceOptions.ProxyCA != nil {
// use provided proxyCA
proxySigningKey = instanceOptions.ProxyCA.ProxySigningKey
proxySigningCert = instanceOptions.ProxyCA.ProxySigningCert
} else {
// create certificates for aggregation and client-cert auth
proxySigningKey, err = testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
}
}
proxyCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt")
if err := os.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil {
@ -213,6 +245,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, err
}
s.Authentication.ClientCert.ClientCA = clientCACertFile
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
// TODO: set up a general clean up for testserver
if clientgotransport.DialerStopCh == wait.NeverStop {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
t.Cleanup(cancel)
clientgotransport.DialerStopCh = ctx.Done()
}
s.PeerCAFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, s.SecureServing.ServerCert.PairName+".crt")
}
}
s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device

View File

@ -28,19 +28,25 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/server/filters"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/apiserver/pkg/util/openapi"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers"
clientgoclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
openapicommon "k8s.io/kube-openapi/pkg/common"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/kubeapiserver"
@ -193,3 +199,50 @@ func BuildPriorityAndFairness(s controlplaneapiserver.CompletedOptions, extclien
s.GenericServerRunOptions.RequestTimeout/4,
), nil
}
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop
// The peer endpoint leases are used to find network locations of apiservers for peer proxy
func CreatePeerEndpointLeaseReconciler(c genericapiserver.Config, storageFactory serverstorage.StorageFactory) (reconcilers.PeerEndpointLeaseReconciler, error) {
ttl := controlplane.DefaultEndpointReconcilerTTL
config, err := storageFactory.NewConfig(api.Resource("apiServerPeerIPInfo"))
if err != nil {
return nil, fmt.Errorf("error creating storage factory config: %w", err)
}
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(config, "/peerserverleases/", ttl)
return reconciler, err
}
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager,
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
}
if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
}
// create proxy client config
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
CertFile: proxyClientCertFile,
KeyFile: proxyClientKeyFile,
CAFile: peerCAFile,
ServerName: "kubernetes.default.svc",
}}
// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID,
reconciler,
serializer,
), nil
}

View File

@ -24,6 +24,7 @@ import (
"strings"
"time"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericoptions "k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/client-go/util/keyutil"
@ -63,6 +64,16 @@ type Options struct {
ProxyClientCertFile string
ProxyClientKeyFile string
// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
// serving certs when routing a request to the peer in the case the request can not be served
// locally due to version skew.
PeerCAFile string
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
EnableAggregatorRouting bool
AggregatorRejectForwardingRedirects bool
@ -154,6 +165,20 @@ func (s *Options) AddFlags(fss *cliflag.NamedFlagSets) {
"when it must call out during a request. This includes proxying requests to a user "+
"api-server and calling out to webhook admission plugins.")
fs.StringVar(&s.PeerCAFile, "peer-ca-file", s.PeerCAFile,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this file will be used to verify serving certificates of peer kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability.")
fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertiseIP, "peer-advertise-ip", s.PeerAdvertiseAddress.PeerAdvertiseIP,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this IP will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")
fs.StringVar(&s.PeerAdvertiseAddress.PeerAdvertisePort, "peer-advertise-port", s.PeerAdvertiseAddress.PeerAdvertisePort,
"If set and the UnknownVersionInteroperabilityProxy feature gate is enabled, this port will be used by peer kube-apiservers to proxy requests to this kube-apiserver "+
"when the request cannot be handled by the peer due to version skew between the kube-apiservers. "+
"This flag is only used in clusters configured with multiple kube-apiservers for high availability. ")
fs.BoolVar(&s.EnableAggregatorRouting, "enable-aggregator-routing", s.EnableAggregatorRouting,
"Turns on aggregator routing requests to endpoints IP rather than cluster IP.")

View File

@ -25,6 +25,7 @@ import (
genericfeatures "k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/api/legacyscheme"
)
@ -69,6 +70,32 @@ func validateAPIPriorityAndFairness(options *Options) []error {
return nil
}
func validateUnknownVersionInteroperabilityProxyFeature() []error {
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
return nil
}
return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")}
}
return nil
}
func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error {
err := []error{}
if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if options.PeerCAFile != "" {
err = append(err, fmt.Errorf("--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
if options.PeerAdvertiseAddress.PeerAdvertiseIP != "" {
err = append(err, fmt.Errorf("--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
if options.PeerAdvertiseAddress.PeerAdvertisePort != "" {
err = append(err, fmt.Errorf("--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on"))
}
}
return err
}
// Validate checks Options and return a slice of found errs.
func (s *Options) Validate() []error {
var errs []error
@ -83,6 +110,8 @@ func (s *Options) Validate() []error {
errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...)
errs = append(errs, validateTokenRequest(s)...)
errs = append(errs, s.Metrics.Validate()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...)
return errs
}

View File

@ -22,8 +22,13 @@ import (
kubeapiserveradmission "k8s.io/apiserver/pkg/admission"
genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/featuregate"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
)
@ -80,6 +85,83 @@ func TestValidateAPIPriorityAndFairness(t *testing.T) {
}
}
func TestValidateUnknownVersionInteroperabilityProxy(t *testing.T) {
tests := []struct {
name string
featureEnabled bool
errShouldContain string
peerCAFile string
peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
}{
{
name: "feature disabled but peerCAFile set",
featureEnabled: false,
peerCAFile: "foo",
errShouldContain: "--peer-ca-file requires UnknownVersionInteroperabilityProxy feature to be turned on",
},
{
name: "feature disabled but peerAdvertiseIP set",
featureEnabled: false,
peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertiseIP: "1.2.3.4"},
errShouldContain: "--peer-advertise-ip requires UnknownVersionInteroperabilityProxy feature to be turned on",
},
{
name: "feature disabled but peerAdvertisePort set",
featureEnabled: false,
peerAdvertiseAddress: peerreconcilers.PeerAdvertiseAddress{PeerAdvertisePort: "1"},
errShouldContain: "--peer-advertise-port requires UnknownVersionInteroperabilityProxy feature to be turned on",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
options := &Options{
PeerCAFile: test.peerCAFile,
PeerAdvertiseAddress: test.peerAdvertiseAddress,
}
if test.featureEnabled {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.UnknownVersionInteroperabilityProxy, true)()
}
var errMessageGot string
if errs := validateUnknownVersionInteroperabilityProxyFlags(options); len(errs) > 0 {
errMessageGot = errs[0].Error()
}
if !strings.Contains(errMessageGot, test.errShouldContain) {
t.Errorf("Expected error message to contain: %q, but got: %q", test.errShouldContain, errMessageGot)
}
})
}
}
func TestValidateUnknownVersionInteroperabilityProxyFeature(t *testing.T) {
const conflict = "UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled"
tests := []struct {
name string
featuresEnabled []featuregate.Feature
}{
{
name: "enabled: UnknownVersionInteroperabilityProxy, disabled: StorageVersionAPI",
featuresEnabled: []featuregate.Feature{features.UnknownVersionInteroperabilityProxy},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
for _, feature := range test.featuresEnabled {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, true)()
}
var errMessageGot string
if errs := validateUnknownVersionInteroperabilityProxyFeature(); len(errs) > 0 {
errMessageGot = errs[0].Error()
}
if !strings.Contains(errMessageGot, conflict) {
t.Errorf("Expected error message to contain: %q, but got: %q", conflict, errMessageGot)
}
})
}
}
func TestValidateOptions(t *testing.T) {
testCases := []struct {
name string

View File

@ -61,11 +61,13 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery"
apiserverfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/registry/generic"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
serverstorage "k8s.io/apiserver/pkg/server/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
@ -83,6 +85,7 @@ import (
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/features"
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/routes"
@ -157,6 +160,23 @@ type ExtraConfig struct {
EnableLogsSupport bool
ProxyTransport *http.Transport
// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
// that can not be served locally
PeerProxy utilpeerproxy.Interface
// PeerEndpointLeaseReconciler updates the peer endpoint leases
PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
// serving certs when routing a request to the peer in the case the request can not be served
// locally due to version skew.
PeerCAFile string
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
// Values to build the IP addresses used by discovery
// The range of IPs to be assigned to services with type=ClusterIP or greater
ServiceIPRange net.IPNet
@ -492,6 +512,36 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil
})
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort)
peerEndpointCtrl := peerreconcilers.New(
c.GenericConfig.APIServerID,
peeraddress,
c.ExtraConfig.PeerEndpointLeaseReconciler,
c.ExtraConfig.EndpointReconcilerConfig.Interval,
clientset)
if err != nil {
return nil, fmt.Errorf("failed to create peer endpoint lease controller: %w", err)
}
m.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller",
func(hookContext genericapiserver.PostStartHookContext) error {
peerEndpointCtrl.Start(hookContext.StopCh)
return nil
})
m.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller",
func() error {
peerEndpointCtrl.Stop()
return nil
})
// Add PostStartHooks for Unknown Version Proxy filter.
if c.ExtraConfig.PeerProxy != nil {
m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
err := c.ExtraConfig.PeerProxy.WaitForCacheSync(context.StopCh)
return err
})
}
}
m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, clientset)
@ -539,6 +589,8 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
leaseName := m.GenericAPIServer.APIServerID
holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort)
// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
controller := lease.NewController(
clock.RealClock{},
kubeClient,
@ -549,7 +601,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
leaseName,
metav1.NamespaceSystem,
// TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver.
labelAPIServerHeartbeatFunc(KubeAPIServer))
labelAPIServerHeartbeatFunc(KubeAPIServer, peeraddress))
go controller.Run(ctx)
return nil
})
@ -597,12 +649,16 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return m, nil
}
func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc {
func labelAPIServerHeartbeatFunc(identity string, peeraddress string) lease.ProcessLeaseFunc {
return func(lease *coordinationapiv1.Lease) error {
if lease.Labels == nil {
lease.Labels = map[string]string{}
}
if lease.Annotations == nil {
lease.Annotations = map[string]string{}
}
// This label indiciates the identity of the lease object.
lease.Labels[IdentityLeaseComponentLabelKey] = identity
@ -613,6 +669,13 @@ func labelAPIServerHeartbeatFunc(identity string) lease.ProcessLeaseFunc {
// convenience label to easily map a lease object to a specific apiserver
lease.Labels[apiv1.LabelHostname] = hostname
// Include apiserver network location <ip_port> used by peers to proxy requests between kube-apiservers
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if peeraddress != "" {
lease.Annotations[apiv1.AnnotationPeerAdvertiseAddress] = peeraddress
}
}
return nil
}
}
@ -752,3 +815,13 @@ func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
return ret
}
// utility function to get the apiserver address that is used by peer apiservers to proxy
// requests to this apiserver in case the peer is incapable of serving the request
func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {
if peerAdvertiseAddress.PeerAdvertiseIP != "" && peerAdvertiseAddress.PeerAdvertisePort != "" {
return net.JoinHostPort(peerAdvertiseAddress.PeerAdvertiseIP, peerAdvertiseAddress.PeerAdvertisePort)
} else {
return net.JoinHostPort(publicAddress.String(), strconv.Itoa(publicServicePort))
}
}

View File

@ -867,6 +867,12 @@ const (
// Allow the usage of options to fine-tune the topology manager policies.
TopologyManagerPolicyOptions featuregate.Feature = "TopologyManagerPolicyOptions"
// owner: @richabanker
// alpha: v1.28
//
// Proxies client to an apiserver capable of serving the request in the event of version skew.
UnknownVersionInteroperabilityProxy featuregate.Feature = "UnknownVersionInteroperabilityProxy"
// owner: @rata, @giuseppe
// kep: https://kep.k8s.io/127
// alpha: v1.25
@ -1157,6 +1163,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
TopologyManagerPolicyOptions: {Default: true, PreRelease: featuregate.Beta},
UnknownVersionInteroperabilityProxy: {Default: false, PreRelease: featuregate.Alpha},
VolumeCapacityPriority: {Default: false, PreRelease: featuregate.Alpha},
UserNamespacesSupport: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -19,6 +19,10 @@ package v1
const (
LabelHostname = "kubernetes.io/hostname"
// Label value is the network location of kube-apiserver stored as <ip:port>
// Stored in APIServer Identity lease objects to view what address is used for peer proxy
AnnotationPeerAdvertiseAddress = "kubernetes.io/peer-advertise-address"
LabelTopologyZone = "topology.kubernetes.io/zone"
LabelTopologyRegion = "topology.kubernetes.io/region"

View File

@ -89,6 +89,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/pquerna/cachecontrol v0.1.0 // indirect

View File

@ -382,6 +382,7 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE=
github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM=

View File

@ -0,0 +1,364 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconcilers
import (
"context"
"fmt"
"net"
"net/http"
"path"
"strconv"
"sync"
"sync/atomic"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
)
const (
APIServerIdentityLabel = "apiserverIdentity"
)
type PeerAdvertiseAddress struct {
PeerAdvertiseIP string
PeerAdvertisePort string
}
type peerEndpointLeases struct {
storage storage.Interface
destroyFn func()
baseKey string
leaseTime time.Duration
}
type PeerEndpointLeaseReconciler interface {
// GetEndpoint retrieves the endpoint for a given apiserverId
GetEndpoint(serverId string) (string, error)
// UpdateLease updates the ip and port of peer servers
UpdateLease(serverId string, ip string, endpointPorts []corev1.EndpointPort) error
// RemoveEndpoints removes this apiserver's peer endpoint lease.
RemoveLease(serverId string) error
// Destroy cleans up everything on shutdown.
Destroy()
// StopReconciling turns any later ReconcileEndpoints call into a noop.
StopReconciling()
}
type peerEndpointLeaseReconciler struct {
serverLeases *peerEndpointLeases
stopReconcilingCalled atomic.Bool
}
// NewPeerEndpointLeaseReconciler creates a new peer endpoint lease reconciler
func NewPeerEndpointLeaseReconciler(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (PeerEndpointLeaseReconciler, error) {
leaseStorage, destroyFn, err := storagefactory.Create(*config, nil)
if err != nil {
return nil, fmt.Errorf("error creating storage factory: %v", err)
}
var once sync.Once
return &peerEndpointLeaseReconciler{
serverLeases: &peerEndpointLeases{
storage: leaseStorage,
destroyFn: func() { once.Do(destroyFn) },
baseKey: baseKey,
leaseTime: leaseTime,
},
}, nil
}
// PeerEndpointController is the controller manager for updating the peer endpoint leases.
// This provides a separate independent reconciliation loop for peer endpoint leases
// which ensures that the peer kube-apiservers are fetching the updated endpoint info for a given apiserver
// in the case when the peer wants to proxy the request to the given apiserver because it can not serve the
// request itself due to version mismatch.
type PeerEndpointLeaseController struct {
reconciler PeerEndpointLeaseReconciler
endpointInterval time.Duration
serverId string
// peeraddress stores the IP and port of this kube-apiserver. Used by peer kube-apiservers to
// route request to this apiserver in case of a version skew.
peeraddress string
client kubernetes.Interface
lock sync.Mutex
stopCh chan struct{} // closed by Stop()
}
func New(serverId string, peeraddress string,
reconciler PeerEndpointLeaseReconciler, endpointInterval time.Duration, client kubernetes.Interface) *PeerEndpointLeaseController {
return &PeerEndpointLeaseController{
reconciler: reconciler,
serverId: serverId,
// peeraddress stores the IP and port of this kube-apiserver. Used by peer kube-apiservers to
// route request to this apiserver in case of a version skew.
peeraddress: peeraddress,
endpointInterval: endpointInterval,
client: client,
stopCh: make(chan struct{}),
}
}
// Start begins the peer endpoint lease reconciler loop that must exist for bootstrapping
// a cluster.
func (c *PeerEndpointLeaseController) Start(stopCh <-chan struct{}) {
localStopCh := make(chan struct{})
go func() {
defer close(localStopCh)
select {
case <-stopCh: // from Start
case <-c.stopCh: // from Stop
}
}()
go c.Run(localStopCh)
}
// RunPeerEndpointReconciler periodically updates the peer endpoint leases
func (c *PeerEndpointLeaseController) Run(stopCh <-chan struct{}) {
// wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil
}, stopCh)
wait.NonSlidingUntil(func() {
if err := c.UpdatePeerEndpointLeases(); err != nil {
runtime.HandleError(fmt.Errorf("unable to update peer endpoint leases: %v", err))
}
}, c.endpointInterval, stopCh)
}
// Stop cleans up this apiserver's peer endpoint leases.
func (c *PeerEndpointLeaseController) Stop() {
c.lock.Lock()
defer c.lock.Unlock()
select {
case <-c.stopCh:
return // only close once
default:
close(c.stopCh)
}
finishedReconciling := make(chan struct{})
go func() {
defer close(finishedReconciling)
klog.Infof("Shutting down peer endpoint lease reconciler")
// stop reconciliation
c.reconciler.StopReconciling()
// Ensure that there will be no race condition with the ReconcileEndpointLeases.
if err := c.reconciler.RemoveLease(c.serverId); err != nil {
klog.Errorf("Unable to remove peer endpoint leases: %v", err)
}
c.reconciler.Destroy()
}()
select {
case <-finishedReconciling:
// done
case <-time.After(2 * c.endpointInterval):
// don't block server shutdown forever if we can't reach etcd to remove ourselves
klog.Warning("peer_endpoint_controller's RemoveEndpoints() timed out")
}
}
// UpdatePeerEndpointLeases attempts to update the peer endpoint leases.
func (c *PeerEndpointLeaseController) UpdatePeerEndpointLeases() error {
host, port, err := net.SplitHostPort(c.peeraddress)
if err != nil {
return err
}
p, err := strconv.Atoi(port)
if err != nil {
return err
}
endpointPorts := createEndpointPortSpec(p, "https")
// Ensure that there will be no race condition with the RemoveEndpointLeases.
c.lock.Lock()
defer c.lock.Unlock()
// Refresh the TTL on our key, independently of whether any error or
// update conflict happens below. This makes sure that at least some of
// the servers will add our endpoint lease.
if err := c.reconciler.UpdateLease(c.serverId, host, endpointPorts); err != nil {
return err
}
return nil
}
// UpdateLease resets the TTL on a server IP in storage
// UpdateLease will create a new key if it doesn't exist.
// We use the first element in endpointPorts as a part of the lease's base key
// This is done to support out tests that simulate 2 apiservers running on the same ip but
// different ports
// It will also do the following if UnknownVersionInteroperabilityProxy feature is enabled
// 1. store the apiserverId as a label
// 2. store the values passed to --peer-advertise-ip and --peer-advertise-port flags to kube-apiserver as an annotation
// with value of format <ip:port>
func (r *peerEndpointLeaseReconciler) UpdateLease(serverId string, ip string, endpointPorts []corev1.EndpointPort) error {
// reconcile endpoints only if apiserver was not shutdown
if r.stopReconcilingCalled.Load() {
return nil
}
// we use the serverID as the key to avoid using the server IP, port as the key.
// note: this means that this lease doesn't enforce mutual exclusion of ip/port usage between apiserver.
key := path.Join(r.serverLeases.baseKey, serverId)
return r.serverLeases.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
existing := input.(*corev1.Endpoints)
existing.Subsets = []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{{IP: ip}},
Ports: endpointPorts,
},
}
// store this server's identity (serverId) as a label. This will be used by
// peers to find the IP of this server when the peer can not serve a request
// due to version skew.
if existing.Labels == nil {
existing.Labels = map[string]string{}
}
existing.Labels[APIServerIdentityLabel] = serverId
// leaseTime needs to be in seconds
leaseTime := uint64(r.serverLeases.leaseTime / time.Second)
// NB: GuaranteedUpdate does not perform the store operation unless
// something changed between load and store (not including resource
// version), meaning we can't refresh the TTL without actually
// changing a field.
existing.Generation++
klog.V(6).Infof("Resetting TTL on server IP %q listed in storage to %v", ip, leaseTime)
return existing, &leaseTime, nil
}, nil)
}
// ListLeases retrieves a list of the current server IPs from storage
func (r *peerEndpointLeaseReconciler) ListLeases() ([]string, error) {
storageOpts := storage.ListOptions{
ResourceVersion: "0",
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
Predicate: storage.Everything,
Recursive: true,
}
ipInfoList, err := r.getIpInfoList(storageOpts)
if err != nil {
return nil, err
}
ipList := make([]string, 0, len(ipInfoList.Items))
for _, ip := range ipInfoList.Items {
if len(ip.Subsets) > 0 && len(ip.Subsets[0].Addresses) > 0 && len(ip.Subsets[0].Addresses[0].IP) > 0 {
ipList = append(ipList, ip.Subsets[0].Addresses[0].IP)
}
}
klog.V(6).Infof("Current server IPs listed in storage are %v", ipList)
return ipList, nil
}
// GetLease retrieves the server IP and port for a specific server id
func (r *peerEndpointLeaseReconciler) GetLease(serverId string) (string, error) {
var fullAddr string
if serverId == "" {
return "", fmt.Errorf("error getting endpoint for serverId: empty serverId")
}
storageOpts := storage.ListOptions{
ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
Predicate: storage.Everything,
Recursive: true,
}
ipInfoList, err := r.getIpInfoList(storageOpts)
if err != nil {
return "", err
}
for _, ip := range ipInfoList.Items {
if ip.Labels[APIServerIdentityLabel] == serverId {
if len(ip.Subsets) > 0 {
var ipStr, portStr string
if len(ip.Subsets[0].Addresses) > 0 {
if len(ip.Subsets[0].Addresses[0].IP) > 0 {
ipStr = ip.Subsets[0].Addresses[0].IP
}
}
if len(ip.Subsets[0].Ports) > 0 {
portStr = fmt.Sprint(ip.Subsets[0].Ports[0].Port)
}
fullAddr = net.JoinHostPort(ipStr, portStr)
break
}
}
}
klog.V(6).Infof("Fetched this server IP for the specified apiserverId %v, %v", serverId, fullAddr)
return fullAddr, nil
}
func (r *peerEndpointLeaseReconciler) StopReconciling() {
r.stopReconcilingCalled.Store(true)
}
// RemoveLease removes the lease on a server IP in storage
// We use the first element in endpointPorts as a part of the lease's base key
// This is done to support out tests that simulate 2 apiservers running on the same ip but
// different ports
func (r *peerEndpointLeaseReconciler) RemoveLease(serverId string) error {
key := path.Join(r.serverLeases.baseKey, serverId)
return r.serverLeases.storage.Delete(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil)
}
func (r *peerEndpointLeaseReconciler) Destroy() {
r.serverLeases.destroyFn()
}
func (r *peerEndpointLeaseReconciler) GetEndpoint(serverId string) (string, error) {
return r.GetLease(serverId)
}
func (r *peerEndpointLeaseReconciler) getIpInfoList(storageOpts storage.ListOptions) (*corev1.EndpointsList, error) {
ipInfoList := &corev1.EndpointsList{}
if err := r.serverLeases.storage.GetList(apirequest.NewDefaultContext(), r.serverLeases.baseKey, storageOpts, ipInfoList); err != nil {
return nil, err
}
return ipInfoList, nil
}
// createEndpointPortSpec creates the endpoint ports
func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.EndpointPort {
return []corev1.EndpointPort{{
Protocol: corev1.ProtocolTCP,
Port: int32(endpointPort),
Name: endpointPortName,
}}
}

View File

@ -0,0 +1,278 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconcilers
import (
"reflect"
"sort"
"testing"
"time"
"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
)
func init() {
var scheme = runtime.NewScheme()
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion))
codecs = serializer.NewCodecFactory(scheme)
}
var codecs serializer.CodecFactory
type serverInfo struct {
existingIP string
id string
ports []corev1.EndpointPort
newIP string
removeLease bool
expectEndpoint string
}
func NewFakePeerEndpointReconciler(t *testing.T, s storage.Interface) peerEndpointLeaseReconciler {
// use the same base key used by the controlplane, but add a random
// prefix so we can reuse the etcd instance for subtests independently.
base := "/" + uuid.New().String() + "/peerserverleases/"
return peerEndpointLeaseReconciler{serverLeases: &peerEndpointLeases{
storage: s,
destroyFn: func() {},
baseKey: base,
leaseTime: 1 * time.Minute, // avoid the lease to timeout on tests
}}
}
func (f *peerEndpointLeaseReconciler) SetKeys(servers []serverInfo) error {
for _, server := range servers {
if err := f.UpdateLease(server.id, server.existingIP, server.ports); err != nil {
return err
}
}
return nil
}
func TestPeerEndpointLeaseReconciler(t *testing.T) {
// enable feature flags
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
t.Cleanup(dFunc)
tests := []struct {
testName string
servers []serverInfo
expectLeases []string
}{
{
testName: "existing IP satisfy",
servers: []serverInfo{{
existingIP: "4.3.2.1",
id: "server-1",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
expectEndpoint: "4.3.2.1:8080",
}, {
existingIP: "1.2.3.4",
id: "server-2",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
expectEndpoint: "1.2.3.4:8080",
}},
expectLeases: []string{"4.3.2.1", "1.2.3.4"},
},
{
testName: "existing IP + new IP = should return the new IP",
servers: []serverInfo{{
existingIP: "4.3.2.2",
id: "server-1",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
newIP: "4.3.2.1",
expectEndpoint: "4.3.2.1:8080",
}, {
existingIP: "1.2.3.4",
id: "server-2",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
newIP: "1.1.1.1",
expectEndpoint: "1.1.1.1:8080",
}},
expectLeases: []string{"4.3.2.1", "1.1.1.1"},
},
{
testName: "no existing IP, should return new IP",
servers: []serverInfo{{
id: "server-1",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
newIP: "1.2.3.4",
expectEndpoint: "1.2.3.4:8080",
}},
expectLeases: []string{"1.2.3.4"},
},
}
for _, test := range tests {
t.Run(test.testName, func(t *testing.T) {
fakeReconciler := NewFakePeerEndpointReconciler(t, s)
err := fakeReconciler.SetKeys(test.servers)
if err != nil {
t.Errorf("unexpected error creating keys: %v", err)
}
for _, server := range test.servers {
if server.newIP != "" {
err = fakeReconciler.UpdateLease(server.id, server.newIP, server.ports)
if err != nil {
t.Errorf("unexpected error reconciling: %v", err)
}
}
}
leases, err := fakeReconciler.ListLeases()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort for comparison
sort.Strings(leases)
sort.Strings(test.expectLeases)
if !reflect.DeepEqual(leases, test.expectLeases) {
t.Errorf("expected %v got: %v", test.expectLeases, leases)
}
for _, server := range test.servers {
endpoint, err := fakeReconciler.GetLease(server.id)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if endpoint != server.expectEndpoint {
t.Errorf("expected %v got: %v", server.expectEndpoint, endpoint)
}
}
})
}
}
func TestPeerLeaseRemoveEndpoints(t *testing.T) {
// enable feature flags
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
t.Cleanup(dFunc)
stopTests := []struct {
testName string
servers []serverInfo
expectLeases []string
apiServerStartup bool
}{
{
testName: "successful remove previous endpoints before apiserver starts",
servers: []serverInfo{
{
existingIP: "1.2.3.4",
id: "test-server-1",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
removeLease: true,
},
{
existingIP: "2.4.6.8",
id: "test-server-2",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
expectLeases: []string{"2.4.6.8"},
apiServerStartup: true,
},
{
testName: "stop reconciling with new IP not in existing ip list",
servers: []serverInfo{{
existingIP: "1.2.3.4",
newIP: "4.6.8.9",
id: "test-server-1",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
},
{
existingIP: "2.4.6.8",
id: "test-server-2",
ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
removeLease: true,
}},
expectLeases: []string{"1.2.3.4"},
},
}
for _, test := range stopTests {
t.Run(test.testName, func(t *testing.T) {
fakeReconciler := NewFakePeerEndpointReconciler(t, s)
err := fakeReconciler.SetKeys(test.servers)
if err != nil {
t.Errorf("unexpected error creating keys: %v", err)
}
if !test.apiServerStartup {
fakeReconciler.StopReconciling()
}
for _, server := range test.servers {
if server.removeLease {
err = fakeReconciler.RemoveLease(server.id)
// if the ip is not on the endpoints, it must return an storage error and stop reconciling
if err != nil {
t.Errorf("unexpected error reconciling: %v", err)
}
}
}
leases, err := fakeReconciler.ListLeases()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort for comparison
sort.Strings(leases)
sort.Strings(test.expectLeases)
if !reflect.DeepEqual(leases, test.expectLeases) {
t.Errorf("expected %v got: %v", test.expectLeases, leases)
}
})
}
}

View File

@ -86,6 +86,13 @@ import (
_ "k8s.io/apiserver/pkg/apis/apiserver/install"
)
// hostnameFunc is a function to set the hostnameFunc of this apiserver.
// To be used for testing purpose only, to simulate scenarios where multiple apiservers
// exist. In such cases we want to ensure unique apiserver IDs which are a hash of hostnameFunc.
var (
hostnameFunc = os.Hostname
)
const (
// DefaultLegacyAPIPrefix is where the legacy APIs will be located.
DefaultLegacyAPIPrefix = "/api"
@ -367,7 +374,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
defaultHealthChecks := []healthz.HealthChecker{healthz.PingHealthz, healthz.LogHealthz}
var id string
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
hostname, err := os.Hostname()
hostname, err := hostnameFunc()
if err != nil {
klog.Fatalf("error getting hostname for apiserver identity: %v", err)
}
@ -897,7 +904,9 @@ func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c
}
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler := filterlatency.TrackCompleted(apiHandler)
handler := apiHandler
handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "authorization")
@ -1070,3 +1079,12 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati
tokenAuthenticator := authenticatorfactory.NewFromTokens(tokens, authn.APIAudiences)
authn.Authenticator = authenticatorunion.New(tokenAuthenticator, authn.Authenticator)
}
// For testing purpose only
func SetHostnameFuncForTests(name string) {
hostnameFunc = func() (host string, err error) {
host = name
err = nil
return
}
}

View File

@ -22,6 +22,7 @@ import (
cachermetrics "k8s.io/apiserver/pkg/storage/cacher/metrics"
etcd3metrics "k8s.io/apiserver/pkg/storage/etcd3/metrics"
flowcontrolmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics"
peerproxymetrics "k8s.io/apiserver/pkg/util/peerproxy/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
@ -50,4 +51,5 @@ func register() {
cachermetrics.Register()
etcd3metrics.Register()
flowcontrolmetrics.Register()
peerproxymetrics.Register()
}

View File

@ -0,0 +1,56 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"context"
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
const (
subsystem = "apiserver"
statuscode = "code"
)
var registerMetricsOnce sync.Once
var (
// peerProxiedRequestsTotal counts the number of requests that were proxied to a peer kube-apiserver.
peerProxiedRequestsTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: subsystem,
Name: "rerouted_request_total",
Help: "Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it",
StabilityLevel: metrics.ALPHA,
},
[]string{statuscode},
)
)
func Register() {
registerMetricsOnce.Do(func() {
legacyregistry.MustRegister(peerProxiedRequestsTotal)
})
}
// IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver
func IncPeerProxiedRequest(ctx context.Context, status string) {
peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1)
}

View File

@ -0,0 +1,67 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"net/http"
"sync"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/storageversion"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
// Interface defines how the Unknown Version Proxy filter interacts with the underlying system.
type Interface interface {
WrapHandler(handler http.Handler) http.Handler
WaitForCacheSync(stopCh <-chan struct{}) error
HasFinishedSync() bool
}
// New creates a new instance to implement unknown version proxy
func NewPeerProxyHandler(informerFactory kubeinformers.SharedInformerFactory,
svm storageversion.Manager,
proxyTransport http.RoundTripper,
serverId string,
reconciler reconcilers.PeerEndpointLeaseReconciler,
serializer runtime.NegotiatedSerializer) *peerProxyHandler {
h := &peerProxyHandler{
name: "PeerProxyHandler",
storageversionManager: svm,
proxyTransport: proxyTransport,
svMap: sync.Map{},
serverId: serverId,
reconciler: reconciler,
serializer: serializer,
}
svi := informerFactory.Internal().V1alpha1().StorageVersions()
h.storageversionInformer = svi.Informer()
svi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
h.addSV(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
h.updateSV(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
h.deleteSV(obj)
}})
return h
}

View File

@ -0,0 +1,357 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"sync/atomic"
"k8s.io/api/apiserverinternal/v1alpha1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/storageversion"
"k8s.io/apiserver/pkg/util/peerproxy/metrics"
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
)
const (
PeerProxiedHeader = "x-kubernetes-peer-proxied"
)
type peerProxyHandler struct {
name string
// StorageVersion informer used to fetch apiserver ids than can serve a resource
storageversionInformer cache.SharedIndexInformer
// StorageVersion manager used to ensure it has finished updating storageversions before
// we start handling external requests
storageversionManager storageversion.Manager
// proxy transport
proxyTransport http.RoundTripper
// identity for this server
serverId string
// reconciler that is used to fetch host port of peer apiserver when proxying request to a peer
reconciler reconcilers.PeerEndpointLeaseReconciler
serializer runtime.NegotiatedSerializer
// SyncMap for storing an up to date copy of the storageversions and apiservers that can serve them
// This map is populated using the StorageVersion informer
// This map has key set to GVR and value being another SyncMap
// The nested SyncMap has key set to apiserver id and value set to boolean
// The nested maps are created to have a "Set" like structure to store unique apiserver ids
// for a given GVR
svMap sync.Map
finishedSync atomic.Bool
}
type serviceableByResponse struct {
locallyServiceable bool
errorFetchingAddressFromLease bool
peerEndpoints []string
}
// responder implements rest.Responder for assisting a connector in writing objects or errors.
type responder struct {
w http.ResponseWriter
ctx context.Context
}
func (h *peerProxyHandler) HasFinishedSync() bool {
return h.finishedSync.Load()
}
func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error {
ok := cache.WaitForNamedCacheSync("unknown-version-proxy", stopCh, h.storageversionInformer.HasSynced, h.storageversionManager.Completed)
if !ok {
return fmt.Errorf("error while waiting for initial cache sync")
}
klog.V(3).Infof("setting finishedSync to true")
h.finishedSync.Store(true)
return nil
}
// WrapHandler will fetch the apiservers that can serve the request and either serve it locally
// or route it to a peer
func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
responsewriters.InternalError(w, r, errors.New("no RequestInfo found in the context"))
return
}
// Allow non-resource requests
if !requestInfo.IsResourceRequest {
klog.V(3).Infof("Not a resource request skipping proxying")
handler.ServeHTTP(w, r)
return
}
// Request has already been proxied once, it must be served locally
if r.Header.Get(PeerProxiedHeader) == "true" {
klog.V(3).Infof("Already rerouted once, skipping proxying to peer")
handler.ServeHTTP(w, r)
return
}
// StorageVersion Informers and/or StorageVersionManager is not synced yet, pass request to next handler
// This will happen for self requests from the kube-apiserver because we have a poststarthook
// to ensure that external requests are not served until the StorageVersion Informer and
// StorageVersionManager has synced
if !h.HasFinishedSync() {
handler.ServeHTTP(w, r)
return
}
gvr := schema.GroupVersionResource{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion, Resource: requestInfo.Resource}
if requestInfo.APIGroup == "" {
gvr.Group = "core"
}
// find servers that are capable of serving this request
serviceableByResp, err := h.findServiceableByServers(gvr, h.serverId, h.reconciler)
if err != nil {
// this means that resource is an aggregated API or a CR since it wasn't found in SV informer cache, pass as it is
handler.ServeHTTP(w, r)
return
}
// found the gvr locally, pass request to the next handler in local apiserver
if serviceableByResp.locallyServiceable {
handler.ServeHTTP(w, r)
return
}
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
if serviceableByResp.errorFetchingAddressFromLease {
klog.ErrorS(err, "error fetching ip and port of remote server while proxying")
responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable("Error getting ip and port info of the remote server while proxying"), h.serializer, gv, w, r)
return
}
// no apiservers were found that could serve the request, pass request to
// next handler, that should eventually serve 404
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
// consult the storageversion-informed map for those
if len(serviceableByResp.peerEndpoints) == 0 {
klog.Errorf(fmt.Sprintf("GVR %v is not served by anything in this cluster", gvr))
handler.ServeHTTP(w, r)
return
}
// otherwise, randomly select an apiserver and proxy request to it
rand := rand.Intn(len(serviceableByResp.peerEndpoints))
destServerHostPort := serviceableByResp.peerEndpoints[rand]
h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort)
})
}
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource, localAPIServerId string, reconciler reconcilers.PeerEndpointLeaseReconciler) (serviceableByResponse, error) {
apiserversi, ok := h.svMap.Load(gvr)
// no value found for the requested gvr in svMap
if !ok || apiserversi == nil {
return serviceableByResponse{}, fmt.Errorf("no StorageVersions found for the GVR: %v", gvr)
}
apiservers := apiserversi.(*sync.Map)
response := serviceableByResponse{}
var peerServerEndpoints []string
apiservers.Range(func(key, value interface{}) bool {
apiserverKey := key.(string)
if apiserverKey == localAPIServerId {
response.errorFetchingAddressFromLease = true
response.locallyServiceable = true
// stop iteration
return false
}
hostPort, err := reconciler.GetEndpoint(apiserverKey)
if err != nil {
response.errorFetchingAddressFromLease = true
klog.Errorf("failed to get peer ip from storage lease for server %s", apiserverKey)
// continue with iteration
return true
}
// check ip format
_, _, err = net.SplitHostPort(hostPort)
if err != nil {
response.errorFetchingAddressFromLease = true
klog.Errorf("invalid address found for server %s", apiserverKey)
// continue with iteration
return true
}
peerServerEndpoints = append(peerServerEndpoints, hostPort)
// continue with iteration
return true
})
response.peerEndpoints = peerServerEndpoints
return response, nil
}
func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
user, ok := apirequest.UserFrom(req.Context())
if !ok {
klog.Errorf("failed to get user info from request")
return
}
// write a new location based on the existing request pointed at the target service
location := &url.URL{}
location.Scheme = "https"
location.Host = host
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, req)
newReq.Header.Add(PeerProxiedHeader, "true")
defer cancelFn()
proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), h.proxyTransport)
delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw}
w := responsewriter.WrapForHTTP1Or2(delegate)
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()})
handler.ServeHTTP(w, newReq)
// Increment the count of proxied requests
metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status()))
}
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.Errorf("Error while proxying request to destination apiserver: %v", err)
http.Error(w, err.Error(), http.StatusServiceUnavailable)
}
// Adds a storageversion object to SVMap
func (h *peerProxyHandler) addSV(obj interface{}) {
sv, ok := obj.(*v1alpha1.StorageVersion)
if !ok {
klog.Errorf("Invalid StorageVersion provided to addSV()")
return
}
h.updateSVMap(nil, sv)
}
// Updates the SVMap to delete old storageversion and add new storageversion
func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
oldSV, ok := oldObj.(*v1alpha1.StorageVersion)
if !ok {
klog.Errorf("Invalid StorageVersion provided to updateSV()")
return
}
newSV, ok := newObj.(*v1alpha1.StorageVersion)
if !ok {
klog.Errorf("Invalid StorageVersion provided to updateSV()")
return
}
h.updateSVMap(oldSV, newSV)
}
// Deletes a storageversion object from SVMap
func (h *peerProxyHandler) deleteSV(obj interface{}) {
sv, ok := obj.(*v1alpha1.StorageVersion)
if !ok {
klog.Errorf("Invalid StorageVersion provided to deleteSV()")
return
}
h.updateSVMap(sv, nil)
}
// Delete old storageversion, add new storagversion
func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) {
if oldSV != nil {
// delete old SV entries
h.deleteSVFromMap(oldSV)
}
if newSV != nil {
// add new SV entries
h.addSVToMap(newSV)
}
}
func (h *peerProxyHandler) deleteSVFromMap(sv *v1alpha1.StorageVersion) {
// The name of storageversion is <group>.<resource>
splitInd := strings.LastIndex(sv.Name, ".")
group := sv.Name[:splitInd]
resource := sv.Name[splitInd+1:]
gvr := schema.GroupVersionResource{Group: group, Resource: resource}
for _, gr := range sv.Status.StorageVersions {
for _, version := range gr.ServedVersions {
versionSplit := strings.Split(version, "/")
if len(versionSplit) == 2 {
version = versionSplit[1]
}
gvr.Version = version
h.svMap.Delete(gvr)
}
}
}
func (h *peerProxyHandler) addSVToMap(sv *v1alpha1.StorageVersion) {
// The name of storageversion is <group>.<resource>
splitInd := strings.LastIndex(sv.Name, ".")
group := sv.Name[:splitInd]
resource := sv.Name[splitInd+1:]
gvr := schema.GroupVersionResource{Group: group, Resource: resource}
for _, gr := range sv.Status.StorageVersions {
for _, version := range gr.ServedVersions {
// some versions have groups included in them, so get rid of the groups
versionSplit := strings.Split(version, "/")
if len(versionSplit) == 2 {
version = versionSplit[1]
}
gvr.Version = version
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{})
apiservers := apiserversi.(*sync.Map)
apiservers.Store(gr.APIServerID, true)
}
}
}

View File

@ -0,0 +1,329 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"net/http"
"strings"
"sync"
"testing"
"time"
"net/http/httptest"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/user"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/reconcilers"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/peerproxy/metrics"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/transport"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
)
const (
requestTimeout = 30 * time.Second
localServerId = "local-apiserver"
remoteServerId = "remote-apiserver"
)
type FakeSVMapData struct {
gvr schema.GroupVersionResource
serverId string
}
type reconciler struct {
do bool
publicIP string
serverId string
}
func TestPeerProxy(t *testing.T) {
testCases := []struct {
desc string
svdata FakeSVMapData
informerFinishedSync bool
requestPath string
peerproxiedHeader string
expectedStatus int
metrics []string
want string
reconcilerConfig reconciler
}{
{
desc: "allow non resource requests",
requestPath: "/foo/bar/baz",
expectedStatus: http.StatusOK,
},
{
desc: "allow if already proxied once",
requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
peerproxiedHeader: "true",
},
{
desc: "allow if unsynced informers",
requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
informerFinishedSync: false,
},
{
desc: "allow if no storage version found",
requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
informerFinishedSync: true,
},
{
// since if no server id is found, we pass request to next handler
//, and our last handler in local chain is an http ok handler
desc: "200 if no serverid found",
requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "bar",
Resource: "baz"},
serverId: ""},
},
{
desc: "503 if no endpoint fetched from lease",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverId: remoteServerId},
},
{
desc: "200 if locally serviceable",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusOK,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverId: localServerId},
},
{
desc: "503 unreachable peer bind address",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverId: remoteServerId},
reconcilerConfig: reconciler{
do: true,
publicIP: "1.2.3.4",
serverId: remoteServerId,
},
metrics: []string{
"apiserver_rerouted_request_total",
},
want: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 1
`,
},
{
desc: "503 unreachable peer public address",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverId: remoteServerId},
reconcilerConfig: reconciler{
do: true,
publicIP: "1.2.3.4",
serverId: remoteServerId,
},
metrics: []string{
"apiserver_rerouted_request_total",
},
want: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 2
`,
},
}
metrics.Register()
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
lastHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
})
reconciler := newFakePeerEndpointReconciler(t)
handler := newHandlerChain(t, lastHandler, reconciler, tt.informerFinishedSync, tt.svdata)
server, requestGetter := createHTTP2ServerWithClient(handler, requestTimeout*2)
defer server.Close()
if tt.reconcilerConfig.do {
// need to enable feature flags first
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
reconciler.UpdateLease(tt.reconcilerConfig.serverId,
tt.reconcilerConfig.publicIP,
[]corev1.EndpointPort{{Name: "foo",
Port: 8080, Protocol: "TCP"}})
}
req, err := http.NewRequest(http.MethodGet, server.URL+tt.requestPath, nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
req.Header.Set(PeerProxiedHeader, tt.peerproxiedHeader)
resp, _ := requestGetter(req)
// compare response
assert.Equal(t, tt.expectedStatus, resp.StatusCode)
// compare metric
if tt.want != "" {
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.want), tt.metrics...); err != nil {
t.Fatal(err)
}
}
})
}
}
func newFakePeerEndpointReconciler(t *testing.T) reconcilers.PeerEndpointLeaseReconciler {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
t.Cleanup(func() { server.Terminate(t) })
scheme := runtime.NewScheme()
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
//utilruntime.Must(core.AddToScheme(scheme))
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion))
codecs := serializer.NewCodecFactory(scheme)
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
config := *sc.ForResource(schema.GroupResource{Resource: "endpoints"})
baseKey := "/" + uuid.New().String() + "/peer-testleases/"
leaseTime := 1 * time.Minute
reconciler, err := reconcilers.NewPeerEndpointLeaseReconciler(&config, baseKey, leaseTime)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
return reconciler
}
func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler {
// Add peerproxy handler
s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
peerProxyHandler, err := newFakePeerProxyHandler(informerFinishedSync, reconciler, svdata, localServerId, s)
if err != nil {
t.Fatalf("Error creating peer proxy handler: %v", err)
}
peerProxyHandler.finishedSync.Store(informerFinishedSync)
handler = peerProxyHandler.WrapHandler(handler)
// Add user info
handler = withFakeUser(handler)
// Add requestInfo handler
requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")}
handler = apifilters.WithRequestInfo(handler, requestInfoFactory)
return handler
}
func newFakePeerProxyHandler(informerFinishedSync bool, reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) {
clientset := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0)
clientConfig := &transport.Config{
TLS: transport.TLSConfig{
Insecure: false,
}}
proxyRoundTripper, err := transport.New(clientConfig)
if err != nil {
return nil, err
}
ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s)
if testDataExists(svdata.gvr) {
ppI.addToStorageVersionMap(svdata.gvr, svdata.serverId)
}
return ppI, nil
}
func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverId string) {
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{})
apiservers := apiserversi.(*sync.Map)
if serverId != "" {
apiservers.Store(serverId, true)
}
}
func testDataExists(gvr schema.GroupVersionResource) bool {
return gvr.Group != "" && gvr.Version != "" && gvr.Resource != ""
}
func withFakeUser(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{
Groups: r.Header["Groups"],
}))
handler.ServeHTTP(w, r)
})
}
// returns a started http2 server, with a client function to send request to the server.
func createHTTP2ServerWithClient(handler http.Handler, clientTimeout time.Duration) (*httptest.Server, func(req *http.Request) (*http.Response, error)) {
server := httptest.NewUnstartedServer(handler)
server.EnableHTTP2 = true
server.StartTLS()
cli := server.Client()
cli.Timeout = clientTimeout
return server, func(req *http.Request) (*http.Response, error) {
return cli.Do(req)
}
}

View File

@ -17,17 +17,30 @@ limitations under the License.
package proxy
import (
"context"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
listersv1 "k8s.io/client-go/listers/core/v1"
)
const (
// taken from https://github.com/kubernetes/kubernetes/blob/release-1.27/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go#L47
aggregatedDiscoveryTimeout = 5 * time.Second
)
// findServicePort finds the service port by name or numerically.
func findServicePort(svc *v1.Service, port int32) (*v1.ServicePort, error) {
for _, svcPort := range svc.Spec.Ports {
@ -117,3 +130,34 @@ func ResolveCluster(services listersv1.ServiceLister, namespace, id string, port
return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type)
}
}
// NewRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests
func NewRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) {
newCtx := req.Context()
cancelFn := func() {}
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
// trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three
// segments that we are going to proxy, we have a discovery request.
if !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 {
// discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that
// should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions)
// so forcing a short timeout here helps responsiveness of all clients.
newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout)
}
}
// WithContext creates a shallow clone of the request with the same context.
newReq := req.WithContext(newCtx)
newReq.Header = utilnet.CloneHeader(req.Header)
newReq.URL = location
newReq.Host = location.Host
// If the original request has an audit ID, let's make sure we propagate this
// to the aggregated server.
if auditID, found := audit.AuditIDFrom(req.Context()); found {
newReq.Header.Set(auditinternal.HeaderAuditID, string(auditID))
}
return newReq, cancelFn
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
genericfeatures "k8s.io/apiserver/pkg/features"
peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/egressselector"
@ -76,6 +77,16 @@ const (
// ExtraConfig represents APIServices-specific configuration
type ExtraConfig struct {
// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
// serving certs when routing a request to the peer in the case the request can not be served
// locally due to version skew.
PeerCAFile string
// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
// to this apiserver. This happens in cases where the peer is not able to serve the request due to
// version skew. If unset, AdvertiseAddress/BindAddress will be used.
PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
// ProxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
// this to confirm the proxy's identity
ProxyClientCertFile string

View File

@ -17,23 +17,18 @@ limitations under the License.
package apiserver
import (
"context"
"net/http"
"net/url"
"strings"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/httpstream"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/proxy"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
"k8s.io/apiserver/pkg/util/x509metrics"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
@ -43,8 +38,6 @@ import (
const (
aggregatorComponent string = "aggregator"
aggregatedDiscoveryTimeout = 5 * time.Second
)
type certKeyFunc func() ([]byte, []byte)
@ -149,7 +142,7 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
location.Path = req.URL.Path
location.RawQuery = req.URL.Query().Encode()
newReq, cancelFn := newRequestForProxy(location, req)
newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, req)
defer cancelFn()
if handlingInfo.proxyRoundTripper == nil {
@ -177,37 +170,6 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler.ServeHTTP(w, newReq)
}
// newRequestForProxy returns a shallow copy of the original request with a context that may include a timeout for discovery requests
func newRequestForProxy(location *url.URL, req *http.Request) (*http.Request, context.CancelFunc) {
newCtx := req.Context()
cancelFn := func() {}
if requestInfo, ok := genericapirequest.RequestInfoFrom(req.Context()); ok {
// trim leading and trailing slashes. Then "/apis/group/version" requests are for discovery, so if we have exactly three
// segments that we are going to proxy, we have a discovery request.
if !requestInfo.IsResourceRequest && len(strings.Split(strings.Trim(requestInfo.Path, "/"), "/")) == 3 {
// discovery requests are used by kubectl and others to determine which resources a server has. This is a cheap call that
// should be fast for every aggregated apiserver. Latency for aggregation is expected to be low (as for all extensions)
// so forcing a short timeout here helps responsiveness of all clients.
newCtx, cancelFn = context.WithTimeout(newCtx, aggregatedDiscoveryTimeout)
}
}
// WithContext creates a shallow clone of the request with the same context.
newReq := req.WithContext(newCtx)
newReq.Header = utilnet.CloneHeader(req.Header)
newReq.URL = location
newReq.Host = location.Host
// If the original request has an audit ID, let's make sure we propagate this
// to the aggregated server.
if auditID, found := audit.AuditIDFrom(req.Context()); found {
newReq.Header.Set(auditinternal.HeaderAuditID, string(auditID))
}
return newReq, cancelFn
}
// responder implements rest.Responder for assisting a connector in writing objects or errors.
type responder struct {
w http.ResponseWriter

View File

@ -49,6 +49,7 @@ import (
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/server/egressselector"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
@ -747,7 +748,7 @@ func TestGetContextForNewRequest(t *testing.T) {
location.Path = req.URL.Path
nestedReq := req.WithContext(genericapirequest.WithRequestInfo(req.Context(), &genericapirequest.RequestInfo{Path: req.URL.Path}))
newReq, cancelFn := newRequestForProxy(location, nestedReq)
newReq, cancelFn := apiserverproxyutil.NewRequestForProxy(location, nestedReq)
defer cancelFn()
theproxy := proxy.NewUpgradeAwareHandler(location, server.Client().Transport, true, false, &responder{w: w})
@ -802,7 +803,7 @@ func TestNewRequestForProxyWithAuditID(t *testing.T) {
req = req.WithContext(ctx)
}
newReq, _ := newRequestForProxy(req.URL, req)
newReq, _ := apiserverproxyutil.NewRequestForProxy(req.URL, req)
if newReq == nil {
t.Fatal("expected a non nil Request object")
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -0,0 +1,244 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/cert"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversiongc"
"k8s.io/kubernetes/pkg/controlplane"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"
)
func TestPeerProxiedRequest(t *testing.T) {
ktesting.SetDefaultVerbosity(1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
t.Cleanup(cancel)
// ensure to stop cert reloading after shutdown
transport.DialerStopCh = ctx.Done()
// enable feature flags
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()
// create sharedetcd
etcd := framework.SharedEtcd()
// create certificates for aggregation and client-cert auth
proxyCA, err := createProxyCertContent()
require.NoError(t, err)
// start test server with all APIs enabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-a")
serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
EnableCertAuth: true,
ProxyCA: &proxyCA},
[]string{}, etcd)
defer serverA.TearDownFn()
// start another test server with some api disabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-b")
serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{
EnableCertAuth: true,
ProxyCA: &proxyCA},
[]string{fmt.Sprintf("--runtime-config=%s", "batch/v1=false")}, etcd)
defer serverB.TearDownFn()
kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
require.NoError(t, err)
kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
require.NoError(t, err)
// create jobs resource using serverA
job := createJobResource()
_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
require.NoError(t, err)
klog.Infof("\nServerA has created jobs\n")
// List jobs using ServerB
// This request should be proxied to ServerA since ServerB does not have batch API enabled
jobsB, err := kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
require.NoError(t, err)
assert.NotEmpty(t, jobsB)
assert.Equal(t, job.Name, jobsB.Items[0].Name)
}
func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
ktesting.SetDefaultVerbosity(1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
t.Cleanup(cancel)
// ensure to stop cert reloading after shutdown
transport.DialerStopCh = ctx.Done()
// enable feature flags
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)()
// create sharedetcd
etcd := framework.SharedEtcd()
// create certificates for aggregation and client-cert auth
proxyCA, err := createProxyCertContent()
require.NoError(t, err)
// set lease duration to 1s for serverA to ensure that storageversions for serverA are updated
// once it is shutdown
controlplane.IdentityLeaseDurationSeconds = 10
controlplane.IdentityLeaseGCPeriod = time.Second
controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second
// start serverA with all APIs enabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-a")
serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
require.NoError(t, err)
// ensure storageversion garbage collector ctlr is set up
informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second)
setupStorageVersionGC(ctx, kubeClientSetA, informersA)
// reset lease duration to default value for serverB and serverC since we will not be
// shutting these down
controlplane.IdentityLeaseDurationSeconds = 3600
// start serverB with some api disabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-b")
serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{
fmt.Sprintf("--runtime-config=%v", "batch/v1=false")}, etcd)
defer serverB.TearDownFn()
kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
require.NoError(t, err)
// ensure storageversion garbage collector ctlr is set up
informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second)
setupStorageVersionGC(ctx, kubeClientSetB, informersB)
// start serverC with all APIs enabled
// override hostname to ensure unique ips
server.SetHostnameFuncForTests("test-server-c")
serverC := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd)
defer serverC.TearDownFn()
// create jobs resource using serverA
job := createJobResource()
_, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{})
require.NoError(t, err)
klog.Infof("\nServerA has created jobs\n")
// shutdown serverA
serverA.TearDownFn()
var jobsB *v1.JobList
// list jobs using ServerB which it should proxy to ServerC and get back valid response
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
jobsB, err = kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
if err != nil {
return false, nil
}
if jobsB != nil {
return true, nil
}
return false, nil
})
klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items))
require.NoError(t, err)
assert.NotEmpty(t, jobsB)
assert.Equal(t, job.Name, jobsB.Items[0].Name)
}
func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) {
leaseInformer := informers.Coordination().V1().Leases()
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
go leaseInformer.Informer().Run(ctx.Done())
go storageVersionInformer.Informer().Run(ctx.Done())
controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer)
go controller.Run(ctx)
}
func createProxyCertContent() (kastesting.ProxyCA, error) {
result := kastesting.ProxyCA{}
proxySigningKey, err := testutil.NewPrivateKey()
if err != nil {
return result, err
}
proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey)
if err != nil {
return result, err
}
result = kastesting.ProxyCA{
ProxySigningCert: proxySigningCert,
ProxySigningKey: proxySigningKey,
}
return result, nil
}
func createJobResource() *v1.Job {
return &v1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "test-job",
Namespace: "default",
},
Spec: v1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
Image: "test",
},
},
RestartPolicy: corev1.RestartPolicyNever,
},
},
},
}
}

View File

@ -598,6 +598,7 @@ resources:
// the following resources are not encrypted as they are not REST APIs and hence are not expected
// to be encrypted because it would be impossible to perform a storage migration on them
if strings.Contains(kv.String(), "masterleases") ||
strings.Contains(kv.String(), "peerserverleases") ||
strings.Contains(kv.String(), "serviceips") ||
strings.Contains(kv.String(), "servicenodeports") {
// assert that these resources are not encrypted with any provider

3
vendor/modules.txt vendored
View File

@ -1514,6 +1514,7 @@ k8s.io/apiserver/pkg/endpoints/warning
k8s.io/apiserver/pkg/features
k8s.io/apiserver/pkg/quota/v1
k8s.io/apiserver/pkg/quota/v1/generic
k8s.io/apiserver/pkg/reconcilers
k8s.io/apiserver/pkg/registry/generic
k8s.io/apiserver/pkg/registry/generic/registry
k8s.io/apiserver/pkg/registry/generic/rest
@ -1574,6 +1575,8 @@ k8s.io/apiserver/pkg/util/flowcontrol/request
k8s.io/apiserver/pkg/util/flushwriter
k8s.io/apiserver/pkg/util/notfoundhandler
k8s.io/apiserver/pkg/util/openapi
k8s.io/apiserver/pkg/util/peerproxy
k8s.io/apiserver/pkg/util/peerproxy/metrics
k8s.io/apiserver/pkg/util/proxy
k8s.io/apiserver/pkg/util/shufflesharding
k8s.io/apiserver/pkg/util/webhook