Merge pull request #128402 from richabanker/mvp-agg-discovery

KEP 4020: Replace StorageVersionAPI with aggregated discovery to fetch served resources by a peer apiserver
This commit is contained in:
Kubernetes Prow Robot 2025-03-18 21:43:49 -07:00 committed by GitHub
commit a6227695ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1031 additions and 493 deletions

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controlplane" "k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -214,7 +215,7 @@ func (c *Controller) syncStorageVersion(ctx context.Context, name string) error
for _, v := range sv.Status.StorageVersions { for _, v := range sv.Status.StorageVersions {
lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{}) lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{})
if err != nil || lease == nil || lease.Labels == nil || if err != nil || lease == nil || lease.Labels == nil ||
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != apiserver.KubeAPIServer {
// We cannot find a corresponding identity lease from apiserver as well. // We cannot find a corresponding identity lease from apiserver as well.
// We need to clean up this storage version. // We need to clean up this storage version.
hasInvalidID = true hasInvalidID = true
@ -243,7 +244,7 @@ func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverint
for _, sv := range obj.Status.StorageVersions { for _, sv := range obj.Status.StorageVersions {
lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID) lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID)
if err != nil || lease == nil || lease.Labels == nil || if err != nil || lease == nil || lease.Labels == nil ||
lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != apiserver.KubeAPIServer {
// we cannot find a corresponding identity lease in cache, enqueue the storageversion // we cannot find a corresponding identity lease in cache, enqueue the storageversion
logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name) logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name)
c.storageVersionQueue.Add(obj.Name) c.storageVersionQueue.Add(obj.Name)
@ -269,7 +270,7 @@ func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) {
if castObj.Namespace == metav1.NamespaceSystem && if castObj.Namespace == metav1.NamespaceSystem &&
castObj.Labels != nil && castObj.Labels != nil &&
castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer { castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == apiserver.KubeAPIServer {
logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name) logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name)
c.enqueueLease(castObj) c.enqueueLease(castObj)
} }

View File

@ -311,10 +311,17 @@ func CreateConfig(
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
// build peer proxy config only if peer ca file exists
if opts.PeerCAFile != "" { if opts.PeerCAFile != "" {
config.PeerProxy, err = BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile, leaseInformer := versionedInformers.Coordination().V1().Leases()
opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.Generic.Serializer) config.PeerProxy, err = BuildPeerProxy(
leaseInformer,
genericConfig.LoopbackClientConfig,
opts.ProxyClientCertFile,
opts.ProxyClientKeyFile, opts.PeerCAFile,
opts.PeerAdvertiseAddress,
genericConfig.APIServerID,
config.Extra.PeerEndpointLeaseReconciler,
config.Generic.Serializer)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -78,16 +78,6 @@ func validateNodeSelectorAuthorizationFeature() []error {
return nil return nil
} }
func validateUnknownVersionInteroperabilityProxyFeature() []error {
if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) {
return nil
}
return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")}
}
return nil
}
func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error { func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error {
err := []error{} err := []error{}
if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
@ -142,7 +132,6 @@ func (s *Options) Validate() []error {
errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...) errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...)
errs = append(errs, validateTokenRequest(s)...) errs = append(errs, validateTokenRequest(s)...)
errs = append(errs, s.Metrics.Validate()...) errs = append(errs, s.Metrics.Validate()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...)
errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...) errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...)
errs = append(errs, validateNodeSelectorAuthorizationFeature()...) errs = append(errs, validateNodeSelectorAuthorizationFeature()...)
errs = append(errs, validateServiceAccountTokenSigningConfig(s)...) errs = append(errs, validateServiceAccountTokenSigningConfig(s)...)

View File

@ -26,7 +26,6 @@ import (
genericoptions "k8s.io/apiserver/pkg/server/options" genericoptions "k8s.io/apiserver/pkg/server/options"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
basecompatibility "k8s.io/component-base/compatibility" basecompatibility "k8s.io/component-base/compatibility"
"k8s.io/component-base/featuregate"
basemetrics "k8s.io/component-base/metrics" basemetrics "k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
@ -165,34 +164,6 @@ func TestValidateUnknownVersionInteroperabilityProxy(t *testing.T) {
} }
} }
func TestValidateUnknownVersionInteroperabilityProxyFeature(t *testing.T) {
const conflict = "UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled"
tests := []struct {
name string
featuresEnabled []featuregate.Feature
}{
{
name: "enabled: UnknownVersionInteroperabilityProxy, disabled: StorageVersionAPI",
featuresEnabled: []featuregate.Feature{features.UnknownVersionInteroperabilityProxy},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
for _, feature := range test.featuresEnabled {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, true)
}
var errMessageGot string
if errs := validateUnknownVersionInteroperabilityProxyFeature(); len(errs) > 0 {
errMessageGot = errs[0].Error()
}
if !strings.Contains(errMessageGot, conflict) {
t.Errorf("Expected error message to contain: %q, but got: %q", conflict, errMessageGot)
}
})
}
}
func TestValidateOptions(t *testing.T) { func TestValidateOptions(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string

View File

@ -24,13 +24,13 @@ import (
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage" serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
clientgoinformers "k8s.io/client-go/informers" coordinationv1informers "k8s.io/client-go/informers/coordination/v1"
"k8s.io/client-go/transport"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
) )
@ -43,17 +43,24 @@ const (
DefaultPeerEndpointReconcilerTTL = 15 * time.Second DefaultPeerEndpointReconcilerTTL = 15 * time.Second
) )
func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager, func BuildPeerProxy(
proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress, leaseInformer coordinationv1informers.LeaseInformer,
apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) { loopbackClientConfig *rest.Config,
proxyClientCertFile string,
proxyClientKeyFile string,
peerCAFile string,
peerAdvertiseAddress reconcilers.PeerAdvertiseAddress,
apiServerID string,
reconciler reconcilers.PeerEndpointLeaseReconciler,
serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) {
if proxyClientCertFile == "" { if proxyClientCertFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified") return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified")
} }
if proxyClientKeyFile == "" { if proxyClientKeyFile == "" {
return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified") return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified")
} }
// create proxy client config
clientConfig := &transport.Config{ proxyClientConfig := &transport.Config{
TLS: transport.TLSConfig{ TLS: transport.TLSConfig{
Insecure: false, Insecure: false,
CertFile: proxyClientCertFile, CertFile: proxyClientCertFile,
@ -62,20 +69,15 @@ func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, s
ServerName: "kubernetes.default.svc", ServerName: "kubernetes.default.svc",
}} }}
// build proxy transport
proxyRoundTripper, transportBuildingError := transport.New(clientConfig)
if transportBuildingError != nil {
klog.Error(transportBuildingError.Error())
return nil, transportBuildingError
}
return utilpeerproxy.NewPeerProxyHandler( return utilpeerproxy.NewPeerProxyHandler(
versionedInformer,
svm,
proxyRoundTripper,
apiServerID, apiServerID,
IdentityLeaseComponentLabelKey+"="+KubeAPIServer,
leaseInformer,
reconciler, reconciler,
serializer, serializer,
), nil loopbackClientConfig,
proxyClientConfig,
)
} }
// CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop // CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop

View File

@ -74,6 +74,8 @@ const (
// 1. the lease is an identity lease (different from leader election leases) // 1. the lease is an identity lease (different from leader election leases)
// 2. which component owns this lease // 2. which component owns this lease
IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity" IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity"
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
KubeAPIServer = "kube-apiserver"
) )
// Server is a struct that contains a generic control plane apiserver instance // Server is a struct that contains a generic control plane apiserver instance
@ -212,7 +214,20 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele
return nil return nil
}) })
if c.Extra.PeerProxy != nil { if c.Extra.PeerProxy != nil {
s.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error { // Run local-discovery sync loop
s.GenericAPIServer.AddPostStartHookOrDie("local-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error {
err := c.Extra.PeerProxy.RunLocalDiscoveryCacheSync(context.Done())
return err
})
// Run peer-discovery sync loop.
s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error {
go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1)
return nil
})
// Wait for handler to be ready.
s.GenericAPIServer.AddPostStartHookOrDie("mixed-version-proxy-handler", func(context genericapiserver.PostStartHookContext) error {
err := c.Extra.PeerProxy.WaitForCacheSync(context.Done()) err := c.Extra.PeerProxy.WaitForCacheSync(context.Done())
return err return err
}) })

View File

@ -117,8 +117,6 @@ const (
// 2. which component owns this lease // 2. which component owns this lease
// TODO(sttts): remove this indirection // TODO(sttts): remove this indirection
IdentityLeaseComponentLabelKey = controlplaneapiserver.IdentityLeaseComponentLabelKey IdentityLeaseComponentLabelKey = controlplaneapiserver.IdentityLeaseComponentLabelKey
// KubeAPIServer defines variable used internally when referring to kube-apiserver component
KubeAPIServer = "kube-apiserver"
// repairLoopInterval defines the interval used to run the Services ClusterIP and NodePort repair loops // repairLoopInterval defines the interval used to run the Services ClusterIP and NodePort repair loops
repairLoopInterval = 3 * time.Minute repairLoopInterval = 3 * time.Minute
) )
@ -314,7 +312,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig") return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
} }
cp, err := c.ControlPlane.New(KubeAPIServer, delegationTarget) cp, err := c.ControlPlane.New(controlplaneapiserver.KubeAPIServer, delegationTarget)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -50,6 +50,11 @@ func Register() {
}) })
} }
// Only used for tests.
func Reset() {
legacyregistry.Reset()
}
// IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver // IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver
func IncPeerProxiedRequest(ctx context.Context, status string) { func IncPeerProxiedRequest(ctx context.Context, status string) {
peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1) peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1)

View File

@ -0,0 +1,198 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/client-go/discovery"
"k8s.io/klog/v2"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
v1 "k8s.io/api/coordination/v1"
schema "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
responsewriterutil "k8s.io/apiserver/pkg/util/responsewriter"
)
const (
controllerName = "peer-discovery-cache-sync"
maxRetries = 5
)
func (h *peerProxyHandler) RunPeerDiscoveryCacheSync(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer h.peerLeaseQueue.ShutDown()
defer func() {
err := h.apiserverIdentityInformer.Informer().RemoveEventHandler(h.leaseRegistration)
if err != nil {
klog.Warning("error removing leaseInformer eventhandler")
}
}()
klog.Infof("Workers: %d", workers)
for i := 0; i < workers; i++ {
klog.Infof("Starting worker")
go wait.UntilWithContext(ctx, h.runWorker, time.Second)
}
<-ctx.Done()
}
func (h *peerProxyHandler) enqueueLease(lease *v1.Lease) {
h.peerLeaseQueue.Add(lease.Name)
}
func (h *peerProxyHandler) runWorker(ctx context.Context) {
for h.processNextElectionItem(ctx) {
}
}
func (h *peerProxyHandler) processNextElectionItem(ctx context.Context) bool {
key, shutdown := h.peerLeaseQueue.Get()
if shutdown {
return false
}
defer h.peerLeaseQueue.Done(key)
err := h.syncPeerDiscoveryCache(ctx)
h.handleErr(err, key)
return true
}
func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error {
var fetchDiscoveryErr error
// Rebuild the peer discovery cache from available leases.
leases, err := h.apiserverIdentityInformer.Lister().List(h.identityLeaseLabelSelector)
if err != nil {
utilruntime.HandleError(err)
return err
}
newCache := map[string]map[schema.GroupVersionResource]bool{}
for _, l := range leases {
_, ok := h.isValidPeerIdentityLease(l)
if !ok {
continue
}
discoveryInfo, err := h.fetchNewDiscoveryFor(ctx, l.Name, *l.Spec.HolderIdentity)
if err != nil {
fetchDiscoveryErr = err
}
if discoveryInfo != nil {
newCache[l.Name] = discoveryInfo
}
}
// Overwrite cache with new contents.
h.peerDiscoveryInfoCache.Store(newCache)
return fetchDiscoveryErr
}
func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string, holderIdentity string) (map[schema.GroupVersionResource]bool, error) {
hostport, err := h.hostportInfo(serverID)
if err != nil {
return nil, fmt.Errorf("failed to get host port info from identity lease for server %s: %w", serverID, err)
}
klog.V(4).Infof("Proxying an agg-discovery call from %s to %s", h.serverID, serverID)
servedResources := make(map[schema.GroupVersionResource]bool)
var discoveryErr error
var discoveryResponse *apidiscoveryv2.APIGroupDiscoveryList
discoveryPaths := []string{"/api", "/apis"}
for _, path := range discoveryPaths {
discoveryResponse, discoveryErr = h.aggregateDiscovery(ctx, path, hostport)
if err != nil {
klog.ErrorS(err, "error querying discovery endpoint for serverID", "path", path, "serverID", serverID)
continue
}
for _, groupDiscovery := range discoveryResponse.Items {
groupName := groupDiscovery.Name
if groupName == "" {
groupName = "core"
}
for _, version := range groupDiscovery.Versions {
for _, resource := range version.Resources {
gvr := schema.GroupVersionResource{Group: groupName, Version: version.Version, Resource: resource.Resource}
servedResources[gvr] = true
}
}
}
}
klog.V(4).Infof("Agg discovery done successfully by %s for %s", h.serverID, serverID)
return servedResources, discoveryErr
}
func (h *peerProxyHandler) aggregateDiscovery(ctx context.Context, path string, hostport string) (*apidiscoveryv2.APIGroupDiscoveryList, error) {
req, err := http.NewRequest(http.MethodGet, path, nil)
if err != nil {
return nil, err
}
apiServerUser := &user.DefaultInfo{
Name: user.APIServerUser,
UID: user.APIServerUser,
Groups: []string{user.AllAuthenticated},
}
ctx = apirequest.WithUser(ctx, apiServerUser)
req = req.WithContext(ctx)
req.Header.Add("Accept", discovery.AcceptV2)
writer := responsewriterutil.NewInMemoryResponseWriter()
h.proxyRequestToDestinationAPIServer(req, writer, hostport)
if writer.RespCode() != http.StatusOK {
return nil, fmt.Errorf("discovery request failed with status: %d", writer.RespCode())
}
parsed := &apidiscoveryv2.APIGroupDiscoveryList{}
if err := runtime.DecodeInto(h.discoverySerializer.UniversalDecoder(), writer.Data(), parsed); err != nil {
return nil, fmt.Errorf("error decoding discovery response: %w", err)
}
return parsed, nil
}
// handleErr checks if an error happened and makes sure we will retry later.
func (h *peerProxyHandler) handleErr(err error, key string) {
if err == nil {
h.peerLeaseQueue.Forget(key)
return
}
if h.peerLeaseQueue.NumRequeues(key) < maxRetries {
klog.Infof("Error syncing discovery for peer lease %v: %v", key, err)
h.peerLeaseQueue.AddRateLimited(key)
return
}
h.peerLeaseQueue.Forget(key)
utilruntime.HandleError(err)
klog.Infof("Dropping lease %s out of the queue: %v", key, err)
}

View File

@ -0,0 +1,344 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peerproxy
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
v1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func TestRunPeerDiscoveryCacheSync(t *testing.T) {
localServerID := "local-server"
testCases := []struct {
desc string
leases []*v1.Lease
labelSelectorString string
updatedLease *v1.Lease
deletedLeaseNames []string
wantCache map[string]map[schema.GroupVersionResource]bool
}{
{
desc: "single remote server",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
},
wantCache: map[string]map[schema.GroupVersionResource]bool{
"remote-1": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
},
},
{
desc: "multiple remote servers",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-2",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-2")},
},
},
wantCache: map[string]map[schema.GroupVersionResource]bool{
"remote-1": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
"remote-2": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
},
},
{
desc: "lease update",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
},
updatedLease: &v1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-2")},
},
wantCache: map[string]map[schema.GroupVersionResource]bool{
"remote-1": {
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
},
},
},
{
desc: "lease deletion",
labelSelectorString: "apiserver-identity=testserver",
leases: []*v1.Lease{
{
ObjectMeta: metav1.ObjectMeta{
Name: "remote-1",
Labels: map[string]string{"apiserver-identity": "testserver"},
},
Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")},
},
},
deletedLeaseNames: []string{"remote-1"},
wantCache: map[string]map[schema.GroupVersionResource]bool{},
},
}
for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0)
leaseInformer := fakeInformerFactory.Coordination().V1().Leases()
fakeReconciler := newFakeReconciler()
negotiatedSerializer := serializer.NewCodecFactory(runtime.NewScheme())
loopbackConfig := &rest.Config{}
proxyConfig := &transport.Config{
TLS: transport.TLSConfig{Insecure: true},
}
h, err := NewPeerProxyHandler(
localServerID,
tt.labelSelectorString,
leaseInformer,
fakeReconciler,
negotiatedSerializer,
loopbackConfig,
proxyConfig,
)
if err != nil {
t.Fatalf("failed to create handler: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Add leases to the fake client and informer.
for _, lease := range tt.leases {
_, err := fakeClient.CoordinationV1().Leases("default").Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
t.Fatalf("failed to create lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Add(lease); err != nil {
t.Fatalf("failed to create lease: %v", err)
}
}
go fakeInformerFactory.Start(ctx.Done())
cache.WaitForCacheSync(ctx.Done(), leaseInformer.Informer().HasSynced)
// Create test servers based on leases
testServers := make(map[string]*httptest.Server)
for _, lease := range tt.leases {
testServer := newTestTLSServer(t)
defer testServer.Close()
testServers[lease.Name] = testServer
}
// Modify the reconciler to return the test server URLs
for name, server := range testServers {
fakeReconciler.setEndpoint(name, server.URL[8:])
}
go h.RunPeerDiscoveryCacheSync(ctx, 1)
// Wait for initial cache update.
initialCache := map[string]map[schema.GroupVersionResource]bool{}
for _, lease := range tt.leases {
initialCache[lease.Name] = map[schema.GroupVersionResource]bool{
{Group: "testgroup", Version: "v1", Resource: "testresources"}: true,
}
}
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
return assert.ObjectsAreEqual(initialCache, gotCache), nil
})
if err != nil {
t.Errorf("initial cache update failed: %v", err)
}
// Update the lease if indicated.
if tt.updatedLease != nil {
updatedLease := tt.updatedLease.DeepCopy()
_, err = fakeClient.CoordinationV1().Leases("default").Update(ctx, updatedLease, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("failed to update lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Update(updatedLease); err != nil {
t.Fatalf("failed to update lease: %v", err)
}
}
// Delete leases if indicated.
if len(tt.deletedLeaseNames) > 0 {
for _, leaseName := range tt.deletedLeaseNames {
lease, exists, err := leaseInformer.Informer().GetIndexer().GetByKey("default/" + leaseName)
if err != nil {
t.Fatalf("failed to get lease from indexer: %v", err)
}
if !exists {
t.Fatalf("lease %s not found", leaseName)
}
deletedLease := lease.(*v1.Lease)
err = fakeClient.CoordinationV1().Leases("default").Delete(ctx, deletedLease.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("failed to delete lease: %v", err)
}
if err = leaseInformer.Informer().GetIndexer().Delete(deletedLease); err != nil {
t.Fatalf("failed to delete lease: %v", err)
}
}
}
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
gotCache := h.peerDiscoveryInfoCache.Load()
return assert.ObjectsAreEqual(tt.wantCache, gotCache), nil
})
if err != nil {
t.Errorf("cache doesnt match expectation: %v", err)
}
})
}
}
// newTestTLSServer creates a new httptest.NewTLSServer that serves discovery endpoints.
func newTestTLSServer(t *testing.T) *httptest.Server {
return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/apis" || r.URL.Path == "/api" {
discoveryResponse := &apidiscoveryv2.APIGroupDiscoveryList{
Items: []apidiscoveryv2.APIGroupDiscovery{
{
ObjectMeta: metav1.ObjectMeta{
Name: "testgroup",
},
Versions: []apidiscoveryv2.APIVersionDiscovery{
{
Version: "v1",
Resources: []apidiscoveryv2.APIResourceDiscovery{
{Resource: "testresources"},
},
},
},
},
},
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(discoveryResponse); err != nil {
t.Fatalf("error recording discovery response")
}
} else {
w.WriteHeader(http.StatusNotFound)
}
}))
}
type fakeReconciler struct {
endpoints map[string]string
}
func newFakeReconciler() *fakeReconciler {
return &fakeReconciler{
endpoints: make(map[string]string),
}
}
func (f *fakeReconciler) UpdateLease(serverID string, publicIP string, ports []corev1.EndpointPort) error {
return nil
}
func (f *fakeReconciler) DeleteLease(serverID string) error {
return nil
}
func (f *fakeReconciler) Destroy() {
}
func (f *fakeReconciler) GetEndpoint(serverID string) (string, error) {
endpoint, ok := f.endpoints[serverID]
if !ok {
return "", fmt.Errorf("endpoint not found for serverID: %s", serverID)
}
return endpoint, nil
}
func (f *fakeReconciler) RemoveLease(serverID string) error {
return nil
}
func (f *fakeReconciler) StopReconciling() {
}
func (f *fakeReconciler) setEndpoint(serverID, endpoint string) {
f.endpoints[serverID] = endpoint
}

View File

@ -17,51 +17,171 @@ limitations under the License.
package peerproxy package peerproxy
import ( import (
"context"
"fmt"
"net/http" "net/http"
"sync" "strings"
"sync/atomic"
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/storageversion" "k8s.io/client-go/discovery"
kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
v1 "k8s.io/api/coordination/v1"
schema "k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coordinationv1informers "k8s.io/client-go/informers/coordination/v1"
) )
// Interface defines how the Unknown Version Proxy filter interacts with the underlying system. // Local discovery cache needs to be refreshed periodically to store
// updates made to custom resources or aggregated resource that can
// change dynamically.
const localDiscoveryRefreshInterval = 30 * time.Minute
// Interface defines how the Mixed Version Proxy filter interacts with the underlying system.
type Interface interface { type Interface interface {
WrapHandler(handler http.Handler) http.Handler WrapHandler(handler http.Handler) http.Handler
WaitForCacheSync(stopCh <-chan struct{}) error WaitForCacheSync(stopCh <-chan struct{}) error
HasFinishedSync() bool HasFinishedSync() bool
RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error
RunPeerDiscoveryCacheSync(ctx context.Context, workers int)
} }
// New creates a new instance to implement unknown version proxy // New creates a new instance to implement unknown version proxy
func NewPeerProxyHandler(informerFactory kubeinformers.SharedInformerFactory, // This method is used for an alpha feature UnknownVersionInteroperabilityProxy
svm storageversion.Manager, // and is subject to future modifications.
proxyTransport http.RoundTripper, func NewPeerProxyHandler(
serverId string, serverId string,
identityLeaseLabelSelector string,
leaseInformer coordinationv1informers.LeaseInformer,
reconciler reconcilers.PeerEndpointLeaseReconciler, reconciler reconcilers.PeerEndpointLeaseReconciler,
serializer runtime.NegotiatedSerializer) *peerProxyHandler { ser runtime.NegotiatedSerializer,
loopbackClientConfig *rest.Config,
proxyClientConfig *transport.Config,
) (*peerProxyHandler, error) {
h := &peerProxyHandler{ h := &peerProxyHandler{
name: "PeerProxyHandler", name: "PeerProxyHandler",
storageversionManager: svm, serverID: serverId,
proxyTransport: proxyTransport,
svMap: sync.Map{},
serverId: serverId,
reconciler: reconciler, reconciler: reconciler,
serializer: serializer, serializer: ser,
localDiscoveryInfoCache: atomic.Value{},
localDiscoveryCacheTicker: time.NewTicker(localDiscoveryRefreshInterval),
localDiscoveryInfoCachePopulated: make(chan struct{}),
peerDiscoveryInfoCache: atomic.Value{},
peerLeaseQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: controllerName,
}),
apiserverIdentityInformer: leaseInformer,
} }
svi := informerFactory.Internal().V1alpha1().StorageVersions()
h.storageversionInformer = svi.Informer()
svi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ if parts := strings.Split(identityLeaseLabelSelector, "="); len(parts) != 2 {
return nil, fmt.Errorf("invalid identityLeaseLabelSelector provided, must be of the form key=value, received: %v", identityLeaseLabelSelector)
}
selector, err := labels.Parse(identityLeaseLabelSelector)
if err != nil {
return nil, fmt.Errorf("failed to parse label selector: %w", err)
}
h.identityLeaseLabelSelector = selector
discoveryScheme := runtime.NewScheme()
utilruntime.Must(apidiscoveryv2.AddToScheme(discoveryScheme))
h.discoverySerializer = serializer.NewCodecFactory(discoveryScheme)
discoveryClient, err := discovery.NewDiscoveryClientForConfig(loopbackClientConfig)
if err != nil {
return nil, fmt.Errorf("error creating discovery client: %w", err)
}
h.discoveryClient = discoveryClient
h.localDiscoveryInfoCache.Store(map[schema.GroupVersionResource]bool{})
h.peerDiscoveryInfoCache.Store(map[string]map[schema.GroupVersionResource]bool{})
proxyTransport, err := transport.New(proxyClientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create proxy transport: %w", err)
}
h.proxyTransport = proxyTransport
peerDiscoveryRegistration, err := h.apiserverIdentityInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
h.addSV(obj) if lease, ok := h.isValidPeerIdentityLease(obj); ok {
h.enqueueLease(lease)
}
}, },
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
h.updateSV(oldObj, newObj) oldLease, oldLeaseOk := h.isValidPeerIdentityLease(oldObj)
newLease, newLeaseOk := h.isValidPeerIdentityLease(newObj)
if oldLeaseOk && newLeaseOk &&
oldLease.Name == newLease.Name && *oldLease.Spec.HolderIdentity != *newLease.Spec.HolderIdentity {
h.enqueueLease(newLease)
}
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
h.deleteSV(obj) if lease, ok := h.isValidPeerIdentityLease(obj); ok {
}}) h.enqueueLease(lease)
return h }
},
})
if err != nil {
return nil, err
}
h.leaseRegistration = peerDiscoveryRegistration
return h, nil
}
func (h *peerProxyHandler) isValidPeerIdentityLease(obj interface{}) (*v1.Lease, bool) {
lease, ok := obj.(*v1.Lease)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return nil, false
}
if lease, ok = tombstone.Obj.(*v1.Lease); !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
return nil, false
}
}
if lease == nil {
klog.Error(fmt.Errorf("nil lease object provided"))
return nil, false
}
if h.identityLeaseLabelSelector != nil && h.identityLeaseLabelSelector.String() != "" {
identityLeaseLabel := strings.Split(h.identityLeaseLabelSelector.String(), "=")
if len(identityLeaseLabel) != 2 {
klog.Errorf("invalid identityLeaseLabelSelector format: %s", h.identityLeaseLabelSelector.String())
return nil, false
}
if lease.Labels == nil || lease.Labels[identityLeaseLabel[0]] != identityLeaseLabel[1] {
klog.V(4).Infof("lease %s/%s does not match label selector: %s=%s", lease.Namespace, lease.Name, identityLeaseLabel[0], identityLeaseLabel[1])
return nil, false
}
}
// Ignore self.
if lease.Name == h.serverID {
return nil, false
}
if lease.Spec.HolderIdentity == nil {
klog.Error(fmt.Errorf("invalid lease object provided, missing holderIdentity in lease obj"))
return nil, false
}
return lease, true
} }

View File

@ -25,26 +25,30 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time"
"k8s.io/api/apiserverinternal/v1alpha1" "k8s.io/apimachinery/pkg/labels"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apimachinery/pkg/util/proxy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/reconcilers"
"k8s.io/apiserver/pkg/storageversion"
"k8s.io/apiserver/pkg/util/peerproxy/metrics" "k8s.io/apiserver/pkg/util/peerproxy/metrics"
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" "k8s.io/client-go/discovery"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2" "k8s.io/klog/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
schema "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
epmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
coordinationv1informers "k8s.io/client-go/informers/coordination/v1"
) )
const ( const (
@ -53,33 +57,34 @@ const (
type peerProxyHandler struct { type peerProxyHandler struct {
name string name string
// StorageVersion informer used to fetch apiserver ids than can serve a resource // Identity for this server.
storageversionInformer cache.SharedIndexInformer serverID string
// StorageVersion manager used to ensure it has finished updating storageversions before
// we start handling external requests
storageversionManager storageversion.Manager
// proxy transport
proxyTransport http.RoundTripper
// identity for this server
serverId string
// reconciler that is used to fetch host port of peer apiserver when proxying request to a peer
reconciler reconcilers.PeerEndpointLeaseReconciler
serializer runtime.NegotiatedSerializer
// SyncMap for storing an up to date copy of the storageversions and apiservers that can serve them
// This map is populated using the StorageVersion informer
// This map has key set to GVR and value being another SyncMap
// The nested SyncMap has key set to apiserver id and value set to boolean
// The nested maps are created to have a "Set" like structure to store unique apiserver ids
// for a given GVR
svMap sync.Map
finishedSync atomic.Bool finishedSync atomic.Bool
// Label to check against in identity leases to make sure
// we are working with apiserver identity leases only.
identityLeaseLabelSelector labels.Selector
apiserverIdentityInformer coordinationv1informers.LeaseInformer
leaseRegistration cache.ResourceEventHandlerRegistration
// Reconciler that is used to fetch host port of peer apiserver when proxying request to a peer.
reconciler reconcilers.PeerEndpointLeaseReconciler
// Client to make discovery calls locally.
discoveryClient *discovery.DiscoveryClient
discoverySerializer serializer.CodecFactory
// Cache that stores resources served by this apiserver. Refreshed periodically.
// We always look up in the local discovery cache first, to check whether the
// request can be served by this apiserver instead of proxying it to a peer.
localDiscoveryInfoCache atomic.Value
localDiscoveryCacheTicker *time.Ticker
localDiscoveryInfoCachePopulated chan struct{}
localDiscoveryInfoCachePopulatedOnce sync.Once
// Cache that stores resources served by peer apiservers.
// Refreshed if a new apiserver identity lease is added, deleted or
// holderIndentity change is observed in the lease.
peerDiscoveryInfoCache atomic.Value
proxyTransport http.RoundTripper
// Worker queue that keeps the peerDiscoveryInfoCache up-to-date.
peerLeaseQueue workqueue.TypedRateLimitingInterface[string]
serializer runtime.NegotiatedSerializer
} }
// responder implements rest.Responder for assisting a connector in writing objects or errors. // responder implements rest.Responder for assisting a connector in writing objects or errors.
@ -93,12 +98,22 @@ func (h *peerProxyHandler) HasFinishedSync() bool {
} }
func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error { func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error {
ok := cache.WaitForNamedCacheSync("mixed-version-proxy", stopCh, h.apiserverIdentityInformer.Informer().HasSynced)
ok := cache.WaitForNamedCacheSync("unknown-version-proxy", stopCh, h.storageversionInformer.HasSynced, h.storageversionManager.Completed)
if !ok { if !ok {
return fmt.Errorf("error while waiting for initial cache sync") return fmt.Errorf("error while waiting for initial cache sync")
} }
klog.V(3).Infof("setting finishedSync to true")
if !cache.WaitForNamedCacheSync(controllerName, stopCh, h.leaseRegistration.HasSynced) {
return fmt.Errorf("error while waiting for peer-identity-lease event handler registration sync")
}
// Wait for localDiscoveryInfoCache to be populated.
select {
case <-h.localDiscoveryInfoCachePopulated:
case <-stopCh:
return fmt.Errorf("stop signal received while waiting for local discovery cache population")
}
h.finishedSync.Store(true) h.finishedSync.Store(true)
return nil return nil
} }
@ -109,7 +124,6 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() ctx := r.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx) requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok { if !ok {
responsewriters.InternalError(w, r, errors.New("no RequestInfo found in the context")) responsewriters.InternalError(w, r, errors.New("no RequestInfo found in the context"))
return return
@ -129,10 +143,9 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return return
} }
// StorageVersion Informers and/or StorageVersionManager is not synced yet, pass request to next handler // Apiserver Identity Informers is not synced yet, pass request to next handler
// This will happen for self requests from the kube-apiserver because we have a poststarthook // This will happen for self requests from the kube-apiserver because we have a poststarthook
// to ensure that external requests are not served until the StorageVersion Informer and // to ensure that external requests are not served until the ApiserverIdentity Informer has synced
// StorageVersionManager has synced
if !h.HasFinishedSync() { if !h.HasFinishedSync() {
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
return return
@ -143,15 +156,20 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
gvr.Group = "core" gvr.Group = "core"
} }
apiservers, err := h.findServiceableByServers(gvr) if h.shouldServeLocally(gvr) {
if err != nil {
// resource wasn't found in SV informer cache which means that resource is an aggregated API
// or a CR. This situation is ok to be handled by local handler.
handler.ServeHTTP(w, r) handler.ServeHTTP(w, r)
return return
} }
locallyServiceable, peerEndpoints, err := h.resolveServingLocation(apiservers) // find servers that are capable of serving this request
peerServerIDs := h.findServiceableByPeerFromPeerDiscoveryCache(gvr)
if len(peerServerIDs) == 0 {
klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
handler.ServeHTTP(w, r)
return
}
peerEndpoints, err := h.resolveServingLocation(peerServerIDs)
if err != nil { if err != nil {
gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version}
klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr) klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr)
@ -159,91 +177,145 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler {
return return
} }
// pass request to the next handler if found the gvr locally.
// TODO: maintain locally serviceable GVRs somewhere so that we dont have to
// consult the storageversion-informed map for those
if locallyServiceable {
handler.ServeHTTP(w, r)
return
}
if len(peerEndpoints) == 0 {
klog.Errorf("gvr %v is not served by anything in this cluster", gvr)
handler.ServeHTTP(w, r)
return
}
// otherwise, randomly select an apiserver and proxy request to it
rand := rand.Intn(len(peerEndpoints)) rand := rand.Intn(len(peerEndpoints))
destServerHostPort := peerEndpoints[rand] peerEndpoint := peerEndpoints[rand]
h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort) h.proxyRequestToDestinationAPIServer(r, w, peerEndpoint)
}) })
} }
func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (*sync.Map, error) { // RunLocalDiscoveryCacheSync populated the localDiscoveryInfoCache and
apiserversi, ok := h.svMap.Load(gvr) // starts a goroutine to periodically refresh the local discovery cache.
if !ok || apiserversi == nil { func (h *peerProxyHandler) RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error {
return nil, fmt.Errorf("no storageVersions found for the GVR: %v", gvr) klog.Info("localDiscoveryCacheInvalidation goroutine started")
// Populate the cache initially.
if err := h.populateLocalDiscoveryCache(); err != nil {
return fmt.Errorf("failed to populate initial local discovery cache: %w", err)
} }
apiservers, _ := apiserversi.(*sync.Map) go func() {
return apiservers, nil for {
select {
case <-h.localDiscoveryCacheTicker.C:
klog.V(4).Infof("Invalidating local discovery cache")
if err := h.populateLocalDiscoveryCache(); err != nil {
klog.Errorf("Failed to repopulate local discovery cache: %v", err)
}
case <-stopCh:
klog.Info("localDiscoveryCacheInvalidation goroutine received stop signal")
if h.localDiscoveryCacheTicker != nil {
h.localDiscoveryCacheTicker.Stop()
klog.Info("localDiscoveryCacheTicker stopped")
}
klog.Info("localDiscoveryCacheInvalidation goroutine exiting")
return
}
}
}()
return nil
} }
func (h *peerProxyHandler) resolveServingLocation(apiservers *sync.Map) (bool, []string, error) { func (h *peerProxyHandler) populateLocalDiscoveryCache() error {
var peerServerEndpoints []string _, resourcesByGV, _, err := h.discoveryClient.GroupsAndMaybeResources()
var locallyServiceable bool if err != nil {
var respErr error return fmt.Errorf("error getting API group resources from discovery: %w", err)
}
apiservers.Range(func(key, value interface{}) bool { freshLocalDiscoveryResponse := map[schema.GroupVersionResource]bool{}
apiserverKey := key.(string) for gv, resources := range resourcesByGV {
if apiserverKey == h.serverId { if gv.Group == "" {
locallyServiceable = true gv.Group = "core"
// stop iteration and reset any errors encountered so far. }
respErr = nil for _, resource := range resources.APIResources {
gvr := gv.WithResource(resource.Name)
freshLocalDiscoveryResponse[gvr] = true
}
}
h.localDiscoveryInfoCache.Store(freshLocalDiscoveryResponse)
// Signal that the cache has been populated.
h.localDiscoveryInfoCachePopulatedOnce.Do(func() {
close(h.localDiscoveryInfoCachePopulated)
})
return nil
}
// shouldServeLocally checks if the requested resource is present in the local
// discovery cache indicating the request can be served by this server.
func (h *peerProxyHandler) shouldServeLocally(gvr schema.GroupVersionResource) bool {
cache := h.localDiscoveryInfoCache.Load().(map[schema.GroupVersionResource]bool)
exists, ok := cache[gvr]
if !ok {
klog.V(4).Infof("resource not found for %v in local discovery cache\n", gvr.GroupVersion())
return false return false
} }
hostPort, err := h.hostportInfo(apiserverKey) if exists {
if err != nil {
respErr = err
// continue with iteration
return true return true
} }
return false
}
func (h *peerProxyHandler) findServiceableByPeerFromPeerDiscoveryCache(gvr schema.GroupVersionResource) []string {
var serviceableByIDs []string
cache := h.peerDiscoveryInfoCache.Load().(map[string]map[schema.GroupVersionResource]bool)
for peerID, servedResources := range cache {
// Ignore local apiserver.
if peerID == h.serverID {
continue
}
exists, ok := servedResources[gvr]
if !ok {
continue
}
if exists {
serviceableByIDs = append(serviceableByIDs, peerID)
}
}
return serviceableByIDs
}
// resolveServingLocation resolves the host:port addresses for the given peer IDs.
func (h *peerProxyHandler) resolveServingLocation(peerIDs []string) ([]string, error) {
var peerServerEndpoints []string
var errs []error
for _, id := range peerIDs {
hostPort, err := h.hostportInfo(id)
if err != nil {
errs = append(errs, err)
continue
}
peerServerEndpoints = append(peerServerEndpoints, hostPort) peerServerEndpoints = append(peerServerEndpoints, hostPort)
return true }
})
// reset err if there was atleast one valid peer server found. // reset err if there was atleast one valid peer server found.
if len(peerServerEndpoints) > 0 { if len(peerServerEndpoints) > 0 {
respErr = nil errs = nil
} }
return locallyServiceable, peerServerEndpoints, respErr return peerServerEndpoints, errors.Join(errs...)
} }
func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) { func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) {
hostport, err := h.reconciler.GetEndpoint(apiserverKey) hostPort, err := h.reconciler.GetEndpoint(apiserverKey)
if err != nil {
return "", err
}
// check ip format
_, _, err = net.SplitHostPort(hostport)
if err != nil { if err != nil {
return "", err return "", err
} }
return hostport, nil _, _, err = net.SplitHostPort(hostPort)
if err != nil {
return "", err
}
return hostPort, nil
} }
func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) { func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) {
user, ok := apirequest.UserFrom(req.Context())
if !ok {
klog.Error("failed to get user info from request")
return
}
// write a new location based on the existing request pointed at the target service // write a new location based on the existing request pointed at the target service
location := &url.URL{} location := &url.URL{}
location.Scheme = "https" location.Scheme = "https"
@ -255,107 +327,29 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request,
newReq.Header.Add(PeerProxiedHeader, "true") newReq.Header.Add(PeerProxiedHeader, "true")
defer cancelFn() defer cancelFn()
proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport) proxyRoundTripper, err := h.buildProxyRoundtripper(req)
if err != nil {
klog.Errorf("failed to build proxy round tripper: %v", err)
return
}
delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw} delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw}
w := responsewriter.WrapForHTTP1Or2(delegate) w := responsewriter.WrapForHTTP1Or2(delegate)
handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()}) handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()})
handler.ServeHTTP(w, newReq) handler.ServeHTTP(w, newReq)
metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status())) metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status()))
} }
func (h *peerProxyHandler) buildProxyRoundtripper(req *http.Request) (http.RoundTripper, error) {
user, ok := apirequest.UserFrom(req.Context())
if !ok {
return nil, apierrors.NewBadRequest("no user details present in request")
}
return transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport), nil
}
func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) {
klog.ErrorS(err, "Error while proxying request to destination apiserver") klog.ErrorS(err, "Error while proxying request to destination apiserver")
http.Error(w, err.Error(), http.StatusServiceUnavailable) http.Error(w, err.Error(), http.StatusServiceUnavailable)
} }
// Adds a storageversion object to SVMap
func (h *peerProxyHandler) addSV(obj interface{}) {
sv, ok := obj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to addSV()")
return
}
h.updateSVMap(nil, sv)
}
// Updates the SVMap to delete old storageversion and add new storageversion
func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) {
oldSV, ok := oldObj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to updateSV()")
return
}
newSV, ok := newObj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to updateSV()")
return
}
h.updateSVMap(oldSV, newSV)
}
// Deletes a storageversion object from SVMap
func (h *peerProxyHandler) deleteSV(obj interface{}) {
sv, ok := obj.(*v1alpha1.StorageVersion)
if !ok {
klog.Error("Invalid StorageVersion provided to deleteSV()")
return
}
h.updateSVMap(sv, nil)
}
// Delete old storageversion, add new storagversion
func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) {
if oldSV != nil {
h.deleteSVFromMap(oldSV)
}
if newSV != nil {
h.addSVToMap(newSV)
}
}
func (h *peerProxyHandler) deleteSVFromMap(sv *v1alpha1.StorageVersion) {
// The name of storageversion is <group>.<resource>
splitInd := strings.LastIndex(sv.Name, ".")
group := sv.Name[:splitInd]
resource := sv.Name[splitInd+1:]
gvr := schema.GroupVersionResource{Group: group, Resource: resource}
for _, gr := range sv.Status.StorageVersions {
for _, version := range gr.ServedVersions {
versionSplit := strings.Split(version, "/")
if len(versionSplit) == 2 {
version = versionSplit[1]
}
gvr.Version = version
h.svMap.Delete(gvr)
}
}
}
func (h *peerProxyHandler) addSVToMap(sv *v1alpha1.StorageVersion) {
// The name of storageversion is <group>.<resource>
splitInd := strings.LastIndex(sv.Name, ".")
group := sv.Name[:splitInd]
resource := sv.Name[splitInd+1:]
gvr := schema.GroupVersionResource{Group: group, Resource: resource}
for _, gr := range sv.Status.StorageVersions {
for _, version := range gr.ServedVersions {
// some versions have groups included in them, so get rid of the groups
versionSplit := strings.Split(version, "/")
if len(versionSplit) == 2 {
version = versionSplit[1]
}
gvr.Version = version
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{})
apiservers := apiserversi.(*sync.Map)
apiservers.Store(gr.APIServerID, true)
}
}
}

View File

@ -19,7 +19,6 @@ package peerproxy
import ( import (
"net/http" "net/http"
"strings" "strings"
"sync"
"testing" "testing"
"time" "time"
@ -27,42 +26,39 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/authentication/user"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/reconcilers" "k8s.io/apiserver/pkg/reconcilers"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/peerproxy/metrics" "k8s.io/apiserver/pkg/util/peerproxy/metrics"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil" "k8s.io/component-base/metrics/testutil"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
apifilters "k8s.io/apiserver/pkg/endpoints/filters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
) )
const ( const (
requestTimeout = 30 * time.Second requestTimeout = 30 * time.Second
localServerID = "local-apiserver" localServerID = "local-apiserver"
remoteServerID = "remote-apiserver" remoteServerID1 = "remote-apiserver-1"
remoteServerID2 = "remote-apiserver-2"
) )
type FakeSVMapData struct {
gvr schema.GroupVersionResource
serverIDs []string
}
type server struct { type server struct {
publicIP string publicIP string
serverID string serverID string
@ -76,205 +72,138 @@ type reconciler struct {
func TestPeerProxy(t *testing.T) { func TestPeerProxy(t *testing.T) {
testCases := []struct { testCases := []struct {
desc string desc string
svdata FakeSVMapData
informerFinishedSync bool informerFinishedSync bool
requestPath string requestPath string
peerproxiedHeader string peerproxiedHeader string
expectedStatus int
metrics []string
want string
reconcilerConfig reconciler reconcilerConfig reconciler
localCache map[schema.GroupVersionResource]bool
peerCache map[string]map[schema.GroupVersionResource]bool
wantStatus int
wantMetricsData string
}{ }{
{ {
desc: "allow non resource requests", desc: "allow non resource requests",
requestPath: "/foo/bar/baz", requestPath: "/foo/bar/baz",
expectedStatus: http.StatusOK, wantStatus: http.StatusOK,
}, },
{ {
desc: "allow if already proxied once", desc: "allow if already proxied once",
requestPath: "/api/bar/baz", requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
peerproxiedHeader: "true", peerproxiedHeader: "true",
wantStatus: http.StatusOK,
}, },
{ {
desc: "allow if unsynced informers", desc: "allow if unsynced informers",
requestPath: "/api/bar/baz", requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
informerFinishedSync: false, informerFinishedSync: false,
wantStatus: http.StatusOK,
}, },
{ {
desc: "allow if no storage version found", desc: "Serve locally if serviceable",
requestPath: "/api/bar/baz", requestPath: "/api/foo/bar",
expectedStatus: http.StatusOK, localCache: map[schema.GroupVersionResource]bool{
informerFinishedSync: true, {Group: "core", Version: "foo", Resource: "bar"}: true,
},
wantStatus: http.StatusOK,
}, },
{ {
// since if no server id is found, we pass request to next handler desc: "200 if no appropriate peers found, serve locally",
//, and our last handler in local chain is an http ok handler requestPath: "/api/foo/bar",
desc: "200 if no serverid found",
requestPath: "/api/bar/baz",
expectedStatus: http.StatusOK,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ wantStatus: http.StatusOK,
gvr: schema.GroupVersionResource{
Group: "core",
Version: "bar",
Resource: "baz"},
serverIDs: []string{}},
}, },
{ {
desc: "503 if no endpoint fetched from lease", desc: "503 if no endpoint fetched from lease",
requestPath: "/api/foo/bar", requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ peerCache: map[string]map[schema.GroupVersionResource]bool{
gvr: schema.GroupVersionResource{ remoteServerID1: {
Group: "core", {Group: "core", Version: "foo", Resource: "bar"}: true,
Version: "foo",
Resource: "bar"},
serverIDs: []string{remoteServerID}},
}, },
{ },
desc: "200 if locally serviceable", wantStatus: http.StatusServiceUnavailable,
requestPath: "/api/foo/bar",
expectedStatus: http.StatusOK,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{localServerID}},
}, },
{ {
desc: "503 unreachable peer bind address", desc: "503 unreachable peer bind address",
requestPath: "/api/foo/bar", requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ peerCache: map[string]map[schema.GroupVersionResource]bool{
gvr: schema.GroupVersionResource{ remoteServerID1: {
Group: "core", {Group: "core", Version: "foo", Resource: "bar"}: true,
Version: "foo", },
Resource: "bar"}, },
serverIDs: []string{remoteServerID}},
reconcilerConfig: reconciler{ reconcilerConfig: reconciler{
do: true, do: true,
servers: []server{ servers: []server{
{ {
publicIP: "1.2.3.4", publicIP: "1.2.3.4",
serverID: remoteServerID, serverID: remoteServerID1,
}, },
}, },
}, },
metrics: []string{ wantStatus: http.StatusServiceUnavailable,
"apiserver_rerouted_request_total", wantMetricsData: `
},
want: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter # TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 1 apiserver_rerouted_request_total{code="503"} 1
`, `,
}, },
{
desc: "503 unreachable peer public address",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{remoteServerID}},
reconcilerConfig: reconciler{
do: true,
servers: []server{
{
publicIP: "1.2.3.4",
serverID: remoteServerID,
},
},
},
metrics: []string{
"apiserver_rerouted_request_total",
},
want: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 2
`,
},
{ {
desc: "503 if one apiserver's endpoint lease wasnt found but another valid (unreachable) apiserver was found", desc: "503 if one apiserver's endpoint lease wasnt found but another valid (unreachable) apiserver was found",
requestPath: "/api/foo/bar", requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true, informerFinishedSync: true,
svdata: FakeSVMapData{ peerCache: map[string]map[schema.GroupVersionResource]bool{
gvr: schema.GroupVersionResource{ remoteServerID1: {
Group: "core", {Group: "core", Version: "foo", Resource: "bar"}: true,
Version: "foo", },
Resource: "bar"}, remoteServerID2: {
serverIDs: []string{"aggregated-apiserver", remoteServerID}}, {Group: "core", Version: "foo", Resource: "bar"}: true,
},
},
reconcilerConfig: reconciler{ reconcilerConfig: reconciler{
do: true, do: true,
servers: []server{ servers: []server{
{ {
publicIP: "1.2.3.4", publicIP: "1.2.3.4",
serverID: remoteServerID, serverID: remoteServerID1,
},
},
},
},
{
desc: "503 if all peers had invalid host:port info",
requestPath: "/api/foo/bar",
expectedStatus: http.StatusServiceUnavailable,
informerFinishedSync: true,
svdata: FakeSVMapData{
gvr: schema.GroupVersionResource{
Group: "core",
Version: "foo",
Resource: "bar"},
serverIDs: []string{"aggregated-apiserver", remoteServerID}},
reconcilerConfig: reconciler{
do: true,
servers: []server{
{
publicIP: "1[2.4",
serverID: "aggregated-apiserver",
},
{
publicIP: "2.4]6",
serverID: remoteServerID,
}, },
}, },
}, },
wantStatus: http.StatusServiceUnavailable,
wantMetricsData: `
# HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it
# TYPE apiserver_rerouted_request_total counter
apiserver_rerouted_request_total{code="503"} 1
`,
}, },
} }
metrics.Register() metrics.Register()
for _, tt := range testCases { for _, tt := range testCases {
t.Run(tt.desc, func(t *testing.T) { t.Run(tt.desc, func(t *testing.T) {
defer metrics.Reset()
lastHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { lastHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK")) w.Write([]byte("OK"))
}) })
reconciler := newFakePeerEndpointReconciler(t) serverIDs := []string{localServerID}
handler := newHandlerChain(t, lastHandler, reconciler, tt.informerFinishedSync, tt.svdata) for peerID := range tt.peerCache {
serverIDs = append(serverIDs, peerID)
}
fakeReconciler := newFakePeerEndpointReconciler(t)
handler := newHandlerChain(t, tt.informerFinishedSync, lastHandler, fakeReconciler, tt.localCache, tt.peerCache)
server, requestGetter := createHTTP2ServerWithClient(handler, requestTimeout*2) server, requestGetter := createHTTP2ServerWithClient(handler, requestTimeout*2)
defer server.Close() defer server.Close()
if tt.reconcilerConfig.do { if tt.reconcilerConfig.do {
// need to enable feature flags first
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)
for _, server := range tt.reconcilerConfig.servers { for _, s := range tt.reconcilerConfig.servers {
err := reconciler.UpdateLease(server.serverID, err := fakeReconciler.UpdateLease(s.serverID,
server.publicIP, s.publicIP,
[]corev1.EndpointPort{{Name: "foo", []corev1.EndpointPort{{Name: "foo",
Port: 8080, Protocol: "TCP"}}) Port: 8080, Protocol: "TCP"}})
if err != nil { if err != nil {
t.Fatalf("failed to update peer endpoint lease - %v", err) t.Errorf("Failed to update lease for server %s", s.serverID)
} }
} }
} }
@ -285,17 +214,14 @@ func TestPeerProxy(t *testing.T) {
} }
req.Header.Set(PeerProxiedHeader, tt.peerproxiedHeader) req.Header.Set(PeerProxiedHeader, tt.peerproxiedHeader)
resp, err := requestGetter(req) resp, _ := requestGetter(req)
if err != nil {
t.Fatalf("unexpected error trying to get the request: %v", err)
}
// compare response // compare response
assert.Equal(t, tt.expectedStatus, resp.StatusCode) assert.Equal(t, tt.wantStatus, resp.StatusCode)
// compare metric // compare metric
if tt.want != "" { if tt.wantMetricsData != "" {
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.want), tt.metrics...); err != nil { if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetricsData), []string{"apiserver_rerouted_request_total"}...); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -324,10 +250,12 @@ func newFakePeerEndpointReconciler(t *testing.T) reconcilers.PeerEndpointLeaseRe
return reconciler return reconciler
} }
func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler { func newHandlerChain(t *testing.T, informerFinishedSync bool, handler http.Handler,
reconciler reconcilers.PeerEndpointLeaseReconciler,
localCache map[schema.GroupVersionResource]bool, peerCache map[string]map[schema.GroupVersionResource]bool) http.Handler {
// Add peerproxy handler // Add peerproxy handler
s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion()
peerProxyHandler, err := newFakePeerProxyHandler(reconciler, svdata, localServerID, s) peerProxyHandler, err := newFakePeerProxyHandler(informerFinishedSync, reconciler, localServerID, s, localCache, peerCache)
if err != nil { if err != nil {
t.Fatalf("Error creating peer proxy handler: %v", err) t.Fatalf("Error creating peer proxy handler: %v", err)
} }
@ -343,36 +271,28 @@ func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.
return handler return handler
} }
func newFakePeerProxyHandler(reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) { func newFakePeerProxyHandler(informerFinishedSync bool,
reconciler reconcilers.PeerEndpointLeaseReconciler, id string, s runtime.NegotiatedSerializer,
localCache map[schema.GroupVersionResource]bool, peerCache map[string]map[schema.GroupVersionResource]bool) (*peerProxyHandler, error) {
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, 0) informerFactory := informers.NewSharedInformerFactory(clientset, 0)
leaseInformer := informerFactory.Coordination().V1().Leases()
clientConfig := &transport.Config{ clientConfig := &transport.Config{
TLS: transport.TLSConfig{ TLS: transport.TLSConfig{
Insecure: false, Insecure: false,
}} }}
proxyRoundTripper, err := transport.New(clientConfig) loopbackClientConfig := &rest.Config{
Host: "localhost:1010",
}
ppH, err := NewPeerProxyHandler(id, "identity=testserver", leaseInformer, reconciler, s, loopbackClientConfig, clientConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s) ppH.localDiscoveryInfoCache.Store(localCache)
if testDataExists(svdata.gvr) { ppH.peerDiscoveryInfoCache.Store(peerCache)
ppI.addToStorageVersionMap(svdata.gvr, svdata.serverIDs)
}
return ppI, nil
}
func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverIDs []string) { ppH.finishedSync.Store(informerFinishedSync)
apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) return ppH, nil
apiservers := apiserversi.(*sync.Map)
for _, serverID := range serverIDs {
if serverID != "" {
apiservers.Store(serverID, true)
}
}
}
func testDataExists(gvr schema.GroupVersionResource) bool {
return gvr.Group != "" && gvr.Version != "" && gvr.Resource != ""
} }
func withFakeUser(handler http.Handler) http.Handler { func withFakeUser(handler http.Handler) http.Handler {

View File

@ -31,14 +31,12 @@ import (
"k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport" "k8s.io/client-go/transport"
"k8s.io/client-go/util/cert" "k8s.io/client-go/util/cert"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversiongc"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
@ -48,7 +46,6 @@ import (
) )
func TestPeerProxiedRequest(t *testing.T) { func TestPeerProxiedRequest(t *testing.T) {
ktesting.SetDefaultVerbosity(1) ktesting.SetDefaultVerbosity(1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer func() { defer func() {
@ -60,7 +57,6 @@ func TestPeerProxiedRequest(t *testing.T) {
// enable feature flags // enable feature flags
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)
// create sharedetcd // create sharedetcd
@ -111,7 +107,6 @@ func TestPeerProxiedRequest(t *testing.T) {
} }
func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
ktesting.SetDefaultVerbosity(1) ktesting.SetDefaultVerbosity(1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer func() { defer func() {
@ -123,7 +118,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
// enable feature flags // enable feature flags
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)
// create sharedetcd // create sharedetcd
@ -133,8 +127,7 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
proxyCA, err := createProxyCertContent() proxyCA, err := createProxyCertContent()
require.NoError(t, err) require.NoError(t, err)
// set lease duration to 1s for serverA to ensure that storageversions for serverA are updated // modify lease parameters so that they are garbage collected timely.
// once it is shutdown
controlplaneapiserver.IdentityLeaseDurationSeconds = 10 controlplaneapiserver.IdentityLeaseDurationSeconds = 10
controlplaneapiserver.IdentityLeaseGCPeriod = 2 * time.Second controlplaneapiserver.IdentityLeaseGCPeriod = 2 * time.Second
controlplaneapiserver.IdentityLeaseRenewIntervalPeriod = time.Second controlplaneapiserver.IdentityLeaseRenewIntervalPeriod = time.Second
@ -146,10 +139,7 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{"--runtime-config=api/all=true"}, etcd) serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{"--runtime-config=api/all=true"}, etcd)
kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig)
require.NoError(t, err) require.NoError(t, err)
// ensure storageversion garbage collector ctlr is set up
informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second)
informersACtx, informersACancel := context.WithCancel(ctx)
setupStorageVersionGC(informersACtx, kubeClientSetA, informersA)
// reset lease duration to default value for serverB and serverC since we will not be // reset lease duration to default value for serverB and serverC since we will not be
// shutting these down // shutting these down
controlplaneapiserver.IdentityLeaseDurationSeconds = 3600 controlplaneapiserver.IdentityLeaseDurationSeconds = 3600
@ -163,9 +153,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
t.Cleanup(serverB.TearDownFn) t.Cleanup(serverB.TearDownFn)
kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig)
require.NoError(t, err) require.NoError(t, err)
// ensure storageversion garbage collector ctlr is set up
informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second)
setupStorageVersionGC(ctx, kubeClientSetB, informersB)
// start serverC with all APIs enabled // start serverC with all APIs enabled
// override hostname to ensure unique ips // override hostname to ensure unique ips
@ -181,7 +168,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
klog.Infof("\nServerA has created jobs\n") klog.Infof("\nServerA has created jobs\n")
// shutdown serverA // shutdown serverA
informersACancel()
serverA.TearDownFn() serverA.TearDownFn()
var jobsB *v1.JobList var jobsB *v1.JobList
@ -212,16 +198,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) {
assert.Equal(t, job.Name, jobsB.Items[0].Name) assert.Equal(t, job.Name, jobsB.Items[0].Name)
} }
func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) {
leaseInformer := informers.Coordination().V1().Leases()
storageVersionInformer := informers.Internal().V1alpha1().StorageVersions()
go leaseInformer.Informer().Run(ctx.Done())
go storageVersionInformer.Informer().Run(ctx.Done())
controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer)
go controller.Run(ctx)
}
func createProxyCertContent() (kastesting.ProxyCA, error) { func createProxyCertContent() (kastesting.ProxyCA, error) {
result := kastesting.ProxyCA{} result := kastesting.ProxyCA{}
proxySigningKey, err := testutil.NewPrivateKey() proxySigningKey, err := testutil.NewPrivateKey()

View File

@ -38,7 +38,6 @@ import (
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
@ -85,7 +84,7 @@ func TestCreateLeaseOnStart(t *testing.T) {
leases, err := kubeclient. leases, err := kubeclient.
CoordinationV1(). CoordinationV1().
Leases(metav1.NamespaceSystem). Leases(metav1.NamespaceSystem).
List(context.TODO(), metav1.ListOptions{LabelSelector: controlplaneapiserver.IdentityLeaseComponentLabelKey + "=" + controlplane.KubeAPIServer}) List(context.TODO(), metav1.ListOptions{LabelSelector: controlplaneapiserver.IdentityLeaseComponentLabelKey + "=" + controlplaneapiserver.KubeAPIServer})
if err != nil { if err != nil {
return false, err return false, err
} }
@ -207,7 +206,7 @@ func newTestLease(acquireTime time.Time, namespace string) *coordinationv1.Lease
Name: testLeaseName, Name: testLeaseName,
Namespace: namespace, Namespace: namespace,
Labels: map[string]string{ Labels: map[string]string{
controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer, controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplaneapiserver.KubeAPIServer,
}, },
}, },
Spec: coordinationv1.LeaseSpec{ Spec: coordinationv1.LeaseSpec{

View File

@ -36,7 +36,6 @@ import (
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/storageversiongc" "k8s.io/kubernetes/pkg/controller/storageversiongc"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
@ -177,7 +176,7 @@ func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface,
Name: name, Name: name,
Namespace: metav1.NamespaceSystem, Namespace: metav1.NamespaceSystem,
Labels: map[string]string{ Labels: map[string]string{
controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer, controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplaneapiserver.KubeAPIServer,
}, },
}, },
Spec: coordinationv1.LeaseSpec{ Spec: coordinationv1.LeaseSpec{