From 8b2cee83c15b1fbc304123902e1bd8e8e06f0a12 Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Mon, 28 Oct 2024 23:04:26 -0700 Subject: [PATCH] Replace StorageVersion API with aggregated discovery to fetch served resources by a peer for MVP Co-authored-by: Joe Betz Co-authored-by: Jordan Liggitt --- .../storageversiongc/gc_controller.go | 7 +- pkg/controlplane/apiserver/config.go | 13 +- .../apiserver/options/validation.go | 11 - .../apiserver/options/validation_test.go | 29 -- pkg/controlplane/apiserver/peer.go | 40 +- pkg/controlplane/apiserver/server.go | 17 +- pkg/controlplane/instance.go | 4 +- .../pkg/util/peerproxy/metrics/metrics.go | 5 + .../pkg/util/peerproxy/peer_discovery.go | 198 +++++++++ .../pkg/util/peerproxy/peer_discovery_test.go | 344 ++++++++++++++++ .../apiserver/pkg/util/peerproxy/peerproxy.go | 166 ++++++-- .../pkg/util/peerproxy/peerproxy_handler.go | 382 +++++++++--------- .../util/peerproxy/peerproxy_handler_test.go | 272 +++++-------- .../apiserver/peerproxy/peer_proxy_test.go | 28 +- .../controlplane/apiserver_identity_test.go | 5 +- test/integration/storageversion/gc_test.go | 3 +- 16 files changed, 1031 insertions(+), 493 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go create mode 100644 staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go diff --git a/pkg/controller/storageversiongc/gc_controller.go b/pkg/controller/storageversiongc/gc_controller.go index 3356a25e22d..71be919c0a6 100644 --- a/pkg/controller/storageversiongc/gc_controller.go +++ b/pkg/controller/storageversiongc/gc_controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controlplane" + "k8s.io/kubernetes/pkg/controlplane/apiserver" "k8s.io/klog/v2" ) @@ -214,7 +215,7 @@ func (c *Controller) syncStorageVersion(ctx context.Context, name string) error for _, v := range sv.Status.StorageVersions { lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(ctx, v.APIServerID, metav1.GetOptions{}) if err != nil || lease == nil || lease.Labels == nil || - lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { + lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != apiserver.KubeAPIServer { // We cannot find a corresponding identity lease from apiserver as well. // We need to clean up this storage version. hasInvalidID = true @@ -243,7 +244,7 @@ func (c *Controller) enqueueStorageVersion(logger klog.Logger, obj *apiserverint for _, sv := range obj.Status.StorageVersions { lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID) if err != nil || lease == nil || lease.Labels == nil || - lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { + lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != apiserver.KubeAPIServer { // we cannot find a corresponding identity lease in cache, enqueue the storageversion logger.V(4).Info("Observed storage version with invalid apiserver entry", "objName", obj.Name) c.storageVersionQueue.Add(obj.Name) @@ -269,7 +270,7 @@ func (c *Controller) onDeleteLease(logger klog.Logger, obj interface{}) { if castObj.Namespace == metav1.NamespaceSystem && castObj.Labels != nil && - castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer { + castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == apiserver.KubeAPIServer { logger.V(4).Info("Observed lease deleted", "castObjName", castObj.Name) c.enqueueLease(castObj) } diff --git a/pkg/controlplane/apiserver/config.go b/pkg/controlplane/apiserver/config.go index 7a1b42ee127..93877a2eac5 100644 --- a/pkg/controlplane/apiserver/config.go +++ b/pkg/controlplane/apiserver/config.go @@ -313,10 +313,17 @@ func CreateConfig( if err != nil { return nil, nil, err } - // build peer proxy config only if peer ca file exists if opts.PeerCAFile != "" { - config.PeerProxy, err = BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile, - opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.Extra.PeerEndpointLeaseReconciler, config.Generic.Serializer) + leaseInformer := versionedInformers.Coordination().V1().Leases() + config.PeerProxy, err = BuildPeerProxy( + leaseInformer, + genericConfig.LoopbackClientConfig, + opts.ProxyClientCertFile, + opts.ProxyClientKeyFile, opts.PeerCAFile, + opts.PeerAdvertiseAddress, + genericConfig.APIServerID, + config.Extra.PeerEndpointLeaseReconciler, + config.Generic.Serializer) if err != nil { return nil, nil, err } diff --git a/pkg/controlplane/apiserver/options/validation.go b/pkg/controlplane/apiserver/options/validation.go index 36e34d13b40..6670bb68d4a 100644 --- a/pkg/controlplane/apiserver/options/validation.go +++ b/pkg/controlplane/apiserver/options/validation.go @@ -78,16 +78,6 @@ func validateNodeSelectorAuthorizationFeature() []error { return nil } -func validateUnknownVersionInteroperabilityProxyFeature() []error { - if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { - return nil - } - return []error{fmt.Errorf("UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled")} - } - return nil -} - func validateUnknownVersionInteroperabilityProxyFlags(options *Options) []error { err := []error{} if !utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { @@ -142,7 +132,6 @@ func (s *Options) Validate() []error { errs = append(errs, s.APIEnablement.Validate(legacyscheme.Scheme, apiextensionsapiserver.Scheme, aggregatorscheme.Scheme)...) errs = append(errs, validateTokenRequest(s)...) errs = append(errs, s.Metrics.Validate()...) - errs = append(errs, validateUnknownVersionInteroperabilityProxyFeature()...) errs = append(errs, validateUnknownVersionInteroperabilityProxyFlags(s)...) errs = append(errs, validateNodeSelectorAuthorizationFeature()...) errs = append(errs, validateServiceAccountTokenSigningConfig(s)...) diff --git a/pkg/controlplane/apiserver/options/validation_test.go b/pkg/controlplane/apiserver/options/validation_test.go index 2ff582b6e77..fccb597da95 100644 --- a/pkg/controlplane/apiserver/options/validation_test.go +++ b/pkg/controlplane/apiserver/options/validation_test.go @@ -26,7 +26,6 @@ import ( genericoptions "k8s.io/apiserver/pkg/server/options" utilfeature "k8s.io/apiserver/pkg/util/feature" basecompatibility "k8s.io/component-base/compatibility" - "k8s.io/component-base/featuregate" basemetrics "k8s.io/component-base/metrics" "k8s.io/kubernetes/pkg/features" @@ -165,34 +164,6 @@ func TestValidateUnknownVersionInteroperabilityProxy(t *testing.T) { } } -func TestValidateUnknownVersionInteroperabilityProxyFeature(t *testing.T) { - const conflict = "UnknownVersionInteroperabilityProxy feature requires StorageVersionAPI feature flag to be enabled" - tests := []struct { - name string - featuresEnabled []featuregate.Feature - }{ - { - name: "enabled: UnknownVersionInteroperabilityProxy, disabled: StorageVersionAPI", - featuresEnabled: []featuregate.Feature{features.UnknownVersionInteroperabilityProxy}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - for _, feature := range test.featuresEnabled { - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, feature, true) - } - var errMessageGot string - if errs := validateUnknownVersionInteroperabilityProxyFeature(); len(errs) > 0 { - errMessageGot = errs[0].Error() - } - if !strings.Contains(errMessageGot, conflict) { - t.Errorf("Expected error message to contain: %q, but got: %q", conflict, errMessageGot) - } - }) - } -} - func TestValidateOptions(t *testing.T) { testCases := []struct { name string diff --git a/pkg/controlplane/apiserver/peer.go b/pkg/controlplane/apiserver/peer.go index e15b8980c8d..9806bf25701 100644 --- a/pkg/controlplane/apiserver/peer.go +++ b/pkg/controlplane/apiserver/peer.go @@ -24,13 +24,13 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/reconcilers" + "k8s.io/client-go/rest" + "k8s.io/client-go/transport" + genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" - "k8s.io/apiserver/pkg/storageversion" utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy" - clientgoinformers "k8s.io/client-go/informers" - "k8s.io/client-go/transport" - "k8s.io/klog/v2" + coordinationv1informers "k8s.io/client-go/informers/coordination/v1" api "k8s.io/kubernetes/pkg/apis/core" ) @@ -43,17 +43,24 @@ const ( DefaultPeerEndpointReconcilerTTL = 15 * time.Second ) -func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, svm storageversion.Manager, - proxyClientCertFile string, proxyClientKeyFile string, peerCAFile string, peerAdvertiseAddress reconcilers.PeerAdvertiseAddress, - apiServerID string, reconciler reconcilers.PeerEndpointLeaseReconciler, serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) { +func BuildPeerProxy( + leaseInformer coordinationv1informers.LeaseInformer, + loopbackClientConfig *rest.Config, + proxyClientCertFile string, + proxyClientKeyFile string, + peerCAFile string, + peerAdvertiseAddress reconcilers.PeerAdvertiseAddress, + apiServerID string, + reconciler reconcilers.PeerEndpointLeaseReconciler, + serializer runtime.NegotiatedSerializer) (utilpeerproxy.Interface, error) { if proxyClientCertFile == "" { return nil, fmt.Errorf("error building peer proxy handler, proxy-cert-file not specified") } if proxyClientKeyFile == "" { return nil, fmt.Errorf("error building peer proxy handler, proxy-key-file not specified") } - // create proxy client config - clientConfig := &transport.Config{ + + proxyClientConfig := &transport.Config{ TLS: transport.TLSConfig{ Insecure: false, CertFile: proxyClientCertFile, @@ -62,20 +69,15 @@ func BuildPeerProxy(versionedInformer clientgoinformers.SharedInformerFactory, s ServerName: "kubernetes.default.svc", }} - // build proxy transport - proxyRoundTripper, transportBuildingError := transport.New(clientConfig) - if transportBuildingError != nil { - klog.Error(transportBuildingError.Error()) - return nil, transportBuildingError - } return utilpeerproxy.NewPeerProxyHandler( - versionedInformer, - svm, - proxyRoundTripper, apiServerID, + IdentityLeaseComponentLabelKey+"="+KubeAPIServer, + leaseInformer, reconciler, serializer, - ), nil + loopbackClientConfig, + proxyClientConfig, + ) } // CreatePeerEndpointLeaseReconciler creates a apiserver endpoint lease reconciliation loop diff --git a/pkg/controlplane/apiserver/server.go b/pkg/controlplane/apiserver/server.go index dd36d9977f7..782a6d15017 100644 --- a/pkg/controlplane/apiserver/server.go +++ b/pkg/controlplane/apiserver/server.go @@ -74,6 +74,8 @@ const ( // 1. the lease is an identity lease (different from leader election leases) // 2. which component owns this lease IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity" + // KubeAPIServer defines variable used internally when referring to kube-apiserver component + KubeAPIServer = "kube-apiserver" ) // Server is a struct that contains a generic control plane apiserver instance @@ -212,7 +214,20 @@ func (c completedConfig) New(name string, delegationTarget genericapiserver.Dele return nil }) if c.Extra.PeerProxy != nil { - s.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error { + // Run local-discovery sync loop + s.GenericAPIServer.AddPostStartHookOrDie("local-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error { + err := c.Extra.PeerProxy.RunLocalDiscoveryCacheSync(context.Done()) + return err + }) + + // Run peer-discovery sync loop. + s.GenericAPIServer.AddPostStartHookOrDie("peer-discovery-cache-sync", func(context genericapiserver.PostStartHookContext) error { + go c.Extra.PeerProxy.RunPeerDiscoveryCacheSync(context, 1) + return nil + }) + + // Wait for handler to be ready. + s.GenericAPIServer.AddPostStartHookOrDie("mixed-version-proxy-handler", func(context genericapiserver.PostStartHookContext) error { err := c.Extra.PeerProxy.WaitForCacheSync(context.Done()) return err }) diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 255157707e6..b90bb877a55 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -115,8 +115,6 @@ const ( // 2. which component owns this lease // TODO(sttts): remove this indirection IdentityLeaseComponentLabelKey = controlplaneapiserver.IdentityLeaseComponentLabelKey - // KubeAPIServer defines variable used internally when referring to kube-apiserver component - KubeAPIServer = "kube-apiserver" // repairLoopInterval defines the interval used to run the Services ClusterIP and NodePort repair loops repairLoopInterval = 3 * time.Minute ) @@ -312,7 +310,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget) return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig") } - cp, err := c.ControlPlane.New(KubeAPIServer, delegationTarget) + cp, err := c.ControlPlane.New(controlplaneapiserver.KubeAPIServer, delegationTarget) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go index 48b89be75ff..9b7aee4eaa1 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/metrics/metrics.go @@ -50,6 +50,11 @@ func Register() { }) } +// Only used for tests. +func Reset() { + legacyregistry.Reset() +} + // IncPeerProxiedRequest increments the # of proxied requests to peer kube-apiserver func IncPeerProxiedRequest(ctx context.Context, status string) { peerProxiedRequestsTotal.WithContext(ctx).WithLabelValues(status).Add(1) diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go new file mode 100644 index 00000000000..8ceef28bb0f --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery.go @@ -0,0 +1,198 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "context" + "fmt" + "net/http" + "time" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/authentication/user" + "k8s.io/client-go/discovery" + "k8s.io/klog/v2" + + apidiscoveryv2 "k8s.io/api/apidiscovery/v2" + v1 "k8s.io/api/coordination/v1" + schema "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + responsewriterutil "k8s.io/apiserver/pkg/util/responsewriter" +) + +const ( + controllerName = "peer-discovery-cache-sync" + maxRetries = 5 +) + +func (h *peerProxyHandler) RunPeerDiscoveryCacheSync(ctx context.Context, workers int) { + defer utilruntime.HandleCrash() + defer h.peerLeaseQueue.ShutDown() + defer func() { + err := h.apiserverIdentityInformer.Informer().RemoveEventHandler(h.leaseRegistration) + if err != nil { + klog.Warning("error removing leaseInformer eventhandler") + } + }() + + klog.Infof("Workers: %d", workers) + for i := 0; i < workers; i++ { + klog.Infof("Starting worker") + go wait.UntilWithContext(ctx, h.runWorker, time.Second) + } + <-ctx.Done() +} + +func (h *peerProxyHandler) enqueueLease(lease *v1.Lease) { + h.peerLeaseQueue.Add(lease.Name) +} + +func (h *peerProxyHandler) runWorker(ctx context.Context) { + for h.processNextElectionItem(ctx) { + } +} + +func (h *peerProxyHandler) processNextElectionItem(ctx context.Context) bool { + key, shutdown := h.peerLeaseQueue.Get() + if shutdown { + return false + } + defer h.peerLeaseQueue.Done(key) + + err := h.syncPeerDiscoveryCache(ctx) + h.handleErr(err, key) + return true +} + +func (h *peerProxyHandler) syncPeerDiscoveryCache(ctx context.Context) error { + var fetchDiscoveryErr error + // Rebuild the peer discovery cache from available leases. + leases, err := h.apiserverIdentityInformer.Lister().List(h.identityLeaseLabelSelector) + if err != nil { + utilruntime.HandleError(err) + return err + } + + newCache := map[string]map[schema.GroupVersionResource]bool{} + for _, l := range leases { + _, ok := h.isValidPeerIdentityLease(l) + if !ok { + continue + } + + discoveryInfo, err := h.fetchNewDiscoveryFor(ctx, l.Name, *l.Spec.HolderIdentity) + if err != nil { + fetchDiscoveryErr = err + } + + if discoveryInfo != nil { + newCache[l.Name] = discoveryInfo + } + } + + // Overwrite cache with new contents. + h.peerDiscoveryInfoCache.Store(newCache) + return fetchDiscoveryErr +} + +func (h *peerProxyHandler) fetchNewDiscoveryFor(ctx context.Context, serverID string, holderIdentity string) (map[schema.GroupVersionResource]bool, error) { + hostport, err := h.hostportInfo(serverID) + if err != nil { + return nil, fmt.Errorf("failed to get host port info from identity lease for server %s: %w", serverID, err) + } + + klog.V(4).Infof("Proxying an agg-discovery call from %s to %s", h.serverID, serverID) + servedResources := make(map[schema.GroupVersionResource]bool) + var discoveryErr error + var discoveryResponse *apidiscoveryv2.APIGroupDiscoveryList + discoveryPaths := []string{"/api", "/apis"} + for _, path := range discoveryPaths { + discoveryResponse, discoveryErr = h.aggregateDiscovery(ctx, path, hostport) + if err != nil { + klog.ErrorS(err, "error querying discovery endpoint for serverID", "path", path, "serverID", serverID) + continue + } + + for _, groupDiscovery := range discoveryResponse.Items { + groupName := groupDiscovery.Name + if groupName == "" { + groupName = "core" + } + + for _, version := range groupDiscovery.Versions { + for _, resource := range version.Resources { + gvr := schema.GroupVersionResource{Group: groupName, Version: version.Version, Resource: resource.Resource} + servedResources[gvr] = true + } + } + } + } + + klog.V(4).Infof("Agg discovery done successfully by %s for %s", h.serverID, serverID) + return servedResources, discoveryErr +} + +func (h *peerProxyHandler) aggregateDiscovery(ctx context.Context, path string, hostport string) (*apidiscoveryv2.APIGroupDiscoveryList, error) { + req, err := http.NewRequest(http.MethodGet, path, nil) + if err != nil { + return nil, err + } + + apiServerUser := &user.DefaultInfo{ + Name: user.APIServerUser, + UID: user.APIServerUser, + Groups: []string{user.AllAuthenticated}, + } + + ctx = apirequest.WithUser(ctx, apiServerUser) + req = req.WithContext(ctx) + + req.Header.Add("Accept", discovery.AcceptV2) + + writer := responsewriterutil.NewInMemoryResponseWriter() + h.proxyRequestToDestinationAPIServer(req, writer, hostport) + if writer.RespCode() != http.StatusOK { + return nil, fmt.Errorf("discovery request failed with status: %d", writer.RespCode()) + } + + parsed := &apidiscoveryv2.APIGroupDiscoveryList{} + if err := runtime.DecodeInto(h.discoverySerializer.UniversalDecoder(), writer.Data(), parsed); err != nil { + return nil, fmt.Errorf("error decoding discovery response: %w", err) + } + + return parsed, nil +} + +// handleErr checks if an error happened and makes sure we will retry later. +func (h *peerProxyHandler) handleErr(err error, key string) { + if err == nil { + h.peerLeaseQueue.Forget(key) + return + } + + if h.peerLeaseQueue.NumRequeues(key) < maxRetries { + klog.Infof("Error syncing discovery for peer lease %v: %v", key, err) + h.peerLeaseQueue.AddRateLimited(key) + return + } + + h.peerLeaseQueue.Forget(key) + utilruntime.HandleError(err) + klog.Infof("Dropping lease %s out of the queue: %v", key, err) +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go new file mode 100644 index 00000000000..94cefd6df08 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peer_discovery_test.go @@ -0,0 +1,344 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/transport" + + apidiscoveryv2 "k8s.io/api/apidiscovery/v2" + v1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestRunPeerDiscoveryCacheSync(t *testing.T) { + localServerID := "local-server" + + testCases := []struct { + desc string + leases []*v1.Lease + labelSelectorString string + updatedLease *v1.Lease + deletedLeaseNames []string + wantCache map[string]map[schema.GroupVersionResource]bool + }{ + { + desc: "single remote server", + labelSelectorString: "apiserver-identity=testserver", + leases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-1", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")}, + }, + }, + wantCache: map[string]map[schema.GroupVersionResource]bool{ + "remote-1": { + {Group: "testgroup", Version: "v1", Resource: "testresources"}: true, + }, + }, + }, + { + desc: "multiple remote servers", + labelSelectorString: "apiserver-identity=testserver", + leases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-1", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")}, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-2", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-2")}, + }, + }, + wantCache: map[string]map[schema.GroupVersionResource]bool{ + "remote-1": { + {Group: "testgroup", Version: "v1", Resource: "testresources"}: true, + }, + "remote-2": { + {Group: "testgroup", Version: "v1", Resource: "testresources"}: true, + }, + }, + }, + { + desc: "lease update", + labelSelectorString: "apiserver-identity=testserver", + leases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-1", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")}, + }, + }, + updatedLease: &v1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-1", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-2")}, + }, + wantCache: map[string]map[schema.GroupVersionResource]bool{ + "remote-1": { + {Group: "testgroup", Version: "v1", Resource: "testresources"}: true, + }, + }, + }, + { + desc: "lease deletion", + labelSelectorString: "apiserver-identity=testserver", + leases: []*v1.Lease{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "remote-1", + Labels: map[string]string{"apiserver-identity": "testserver"}, + }, + Spec: v1.LeaseSpec{HolderIdentity: proto.String("holder-1")}, + }, + }, + deletedLeaseNames: []string{"remote-1"}, + wantCache: map[string]map[schema.GroupVersionResource]bool{}, + }, + } + + for _, tt := range testCases { + t.Run(tt.desc, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + fakeInformerFactory := informers.NewSharedInformerFactory(fakeClient, 0) + leaseInformer := fakeInformerFactory.Coordination().V1().Leases() + + fakeReconciler := newFakeReconciler() + + negotiatedSerializer := serializer.NewCodecFactory(runtime.NewScheme()) + loopbackConfig := &rest.Config{} + proxyConfig := &transport.Config{ + TLS: transport.TLSConfig{Insecure: true}, + } + + h, err := NewPeerProxyHandler( + localServerID, + tt.labelSelectorString, + leaseInformer, + fakeReconciler, + negotiatedSerializer, + loopbackConfig, + proxyConfig, + ) + if err != nil { + t.Fatalf("failed to create handler: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Add leases to the fake client and informer. + for _, lease := range tt.leases { + _, err := fakeClient.CoordinationV1().Leases("default").Create(ctx, lease, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create lease: %v", err) + } + if err = leaseInformer.Informer().GetIndexer().Add(lease); err != nil { + t.Fatalf("failed to create lease: %v", err) + } + } + + go fakeInformerFactory.Start(ctx.Done()) + cache.WaitForCacheSync(ctx.Done(), leaseInformer.Informer().HasSynced) + + // Create test servers based on leases + testServers := make(map[string]*httptest.Server) + for _, lease := range tt.leases { + testServer := newTestTLSServer(t) + defer testServer.Close() + testServers[lease.Name] = testServer + } + + // Modify the reconciler to return the test server URLs + for name, server := range testServers { + fakeReconciler.setEndpoint(name, server.URL[8:]) + } + + go h.RunPeerDiscoveryCacheSync(ctx, 1) + + // Wait for initial cache update. + initialCache := map[string]map[schema.GroupVersionResource]bool{} + for _, lease := range tt.leases { + initialCache[lease.Name] = map[schema.GroupVersionResource]bool{ + {Group: "testgroup", Version: "v1", Resource: "testresources"}: true, + } + } + err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + gotCache := h.peerDiscoveryInfoCache.Load() + return assert.ObjectsAreEqual(initialCache, gotCache), nil + }) + if err != nil { + t.Errorf("initial cache update failed: %v", err) + } + + // Update the lease if indicated. + if tt.updatedLease != nil { + updatedLease := tt.updatedLease.DeepCopy() + _, err = fakeClient.CoordinationV1().Leases("default").Update(ctx, updatedLease, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to update lease: %v", err) + } + if err = leaseInformer.Informer().GetIndexer().Update(updatedLease); err != nil { + t.Fatalf("failed to update lease: %v", err) + } + } + + // Delete leases if indicated. + if len(tt.deletedLeaseNames) > 0 { + for _, leaseName := range tt.deletedLeaseNames { + lease, exists, err := leaseInformer.Informer().GetIndexer().GetByKey("default/" + leaseName) + if err != nil { + t.Fatalf("failed to get lease from indexer: %v", err) + } + if !exists { + t.Fatalf("lease %s not found", leaseName) + } + deletedLease := lease.(*v1.Lease) + err = fakeClient.CoordinationV1().Leases("default").Delete(ctx, deletedLease.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("failed to delete lease: %v", err) + } + if err = leaseInformer.Informer().GetIndexer().Delete(deletedLease); err != nil { + t.Fatalf("failed to delete lease: %v", err) + } + + } + } + + err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, false, func(ctx context.Context) (bool, error) { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } + gotCache := h.peerDiscoveryInfoCache.Load() + return assert.ObjectsAreEqual(tt.wantCache, gotCache), nil + }) + if err != nil { + t.Errorf("cache doesnt match expectation: %v", err) + } + + }) + } +} + +// newTestTLSServer creates a new httptest.NewTLSServer that serves discovery endpoints. +func newTestTLSServer(t *testing.T) *httptest.Server { + return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/apis" || r.URL.Path == "/api" { + discoveryResponse := &apidiscoveryv2.APIGroupDiscoveryList{ + Items: []apidiscoveryv2.APIGroupDiscovery{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testgroup", + }, + Versions: []apidiscoveryv2.APIVersionDiscovery{ + { + Version: "v1", + Resources: []apidiscoveryv2.APIResourceDiscovery{ + {Resource: "testresources"}, + }, + }, + }, + }, + }, + } + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(discoveryResponse); err != nil { + t.Fatalf("error recording discovery response") + } + } else { + w.WriteHeader(http.StatusNotFound) + } + })) +} + +type fakeReconciler struct { + endpoints map[string]string +} + +func newFakeReconciler() *fakeReconciler { + return &fakeReconciler{ + endpoints: make(map[string]string), + } +} + +func (f *fakeReconciler) UpdateLease(serverID string, publicIP string, ports []corev1.EndpointPort) error { + return nil +} + +func (f *fakeReconciler) DeleteLease(serverID string) error { + return nil +} + +func (f *fakeReconciler) Destroy() { +} + +func (f *fakeReconciler) GetEndpoint(serverID string) (string, error) { + endpoint, ok := f.endpoints[serverID] + if !ok { + return "", fmt.Errorf("endpoint not found for serverID: %s", serverID) + } + return endpoint, nil +} + +func (f *fakeReconciler) RemoveLease(serverID string) error { + return nil +} + +func (f *fakeReconciler) StopReconciling() { +} + +func (f *fakeReconciler) setEndpoint(serverID, endpoint string) { + f.endpoints[serverID] = endpoint +} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go index 7ea4d5c4b47..f682a6c6574 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy.go @@ -17,51 +17,171 @@ limitations under the License. package peerproxy import ( + "context" + "fmt" "net/http" - "sync" + "strings" + "sync/atomic" + "time" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apiserver/pkg/reconcilers" - "k8s.io/apiserver/pkg/storageversion" - kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/transport" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + apidiscoveryv2 "k8s.io/api/apidiscovery/v2" + v1 "k8s.io/api/coordination/v1" + schema "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + coordinationv1informers "k8s.io/client-go/informers/coordination/v1" ) -// Interface defines how the Unknown Version Proxy filter interacts with the underlying system. +// Local discovery cache needs to be refreshed periodically to store +// updates made to custom resources or aggregated resource that can +// change dynamically. +const localDiscoveryRefreshInterval = 30 * time.Minute + +// Interface defines how the Mixed Version Proxy filter interacts with the underlying system. type Interface interface { WrapHandler(handler http.Handler) http.Handler WaitForCacheSync(stopCh <-chan struct{}) error HasFinishedSync() bool + RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error + RunPeerDiscoveryCacheSync(ctx context.Context, workers int) } // New creates a new instance to implement unknown version proxy -func NewPeerProxyHandler(informerFactory kubeinformers.SharedInformerFactory, - svm storageversion.Manager, - proxyTransport http.RoundTripper, +// This method is used for an alpha feature UnknownVersionInteroperabilityProxy +// and is subject to future modifications. +func NewPeerProxyHandler( serverId string, + identityLeaseLabelSelector string, + leaseInformer coordinationv1informers.LeaseInformer, reconciler reconcilers.PeerEndpointLeaseReconciler, - serializer runtime.NegotiatedSerializer) *peerProxyHandler { + ser runtime.NegotiatedSerializer, + loopbackClientConfig *rest.Config, + proxyClientConfig *transport.Config, +) (*peerProxyHandler, error) { h := &peerProxyHandler{ - name: "PeerProxyHandler", - storageversionManager: svm, - proxyTransport: proxyTransport, - svMap: sync.Map{}, - serverId: serverId, - reconciler: reconciler, - serializer: serializer, + name: "PeerProxyHandler", + serverID: serverId, + reconciler: reconciler, + serializer: ser, + localDiscoveryInfoCache: atomic.Value{}, + localDiscoveryCacheTicker: time.NewTicker(localDiscoveryRefreshInterval), + localDiscoveryInfoCachePopulated: make(chan struct{}), + peerDiscoveryInfoCache: atomic.Value{}, + peerLeaseQueue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: controllerName, + }), + apiserverIdentityInformer: leaseInformer, } - svi := informerFactory.Internal().V1alpha1().StorageVersions() - h.storageversionInformer = svi.Informer() - svi.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if parts := strings.Split(identityLeaseLabelSelector, "="); len(parts) != 2 { + return nil, fmt.Errorf("invalid identityLeaseLabelSelector provided, must be of the form key=value, received: %v", identityLeaseLabelSelector) + } + selector, err := labels.Parse(identityLeaseLabelSelector) + if err != nil { + return nil, fmt.Errorf("failed to parse label selector: %w", err) + } + h.identityLeaseLabelSelector = selector + + discoveryScheme := runtime.NewScheme() + utilruntime.Must(apidiscoveryv2.AddToScheme(discoveryScheme)) + h.discoverySerializer = serializer.NewCodecFactory(discoveryScheme) + + discoveryClient, err := discovery.NewDiscoveryClientForConfig(loopbackClientConfig) + if err != nil { + return nil, fmt.Errorf("error creating discovery client: %w", err) + } + h.discoveryClient = discoveryClient + h.localDiscoveryInfoCache.Store(map[schema.GroupVersionResource]bool{}) + h.peerDiscoveryInfoCache.Store(map[string]map[schema.GroupVersionResource]bool{}) + + proxyTransport, err := transport.New(proxyClientConfig) + if err != nil { + return nil, fmt.Errorf("failed to create proxy transport: %w", err) + } + h.proxyTransport = proxyTransport + + peerDiscoveryRegistration, err := h.apiserverIdentityInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - h.addSV(obj) + if lease, ok := h.isValidPeerIdentityLease(obj); ok { + h.enqueueLease(lease) + } }, UpdateFunc: func(oldObj, newObj interface{}) { - h.updateSV(oldObj, newObj) + oldLease, oldLeaseOk := h.isValidPeerIdentityLease(oldObj) + newLease, newLeaseOk := h.isValidPeerIdentityLease(newObj) + if oldLeaseOk && newLeaseOk && + oldLease.Name == newLease.Name && *oldLease.Spec.HolderIdentity != *newLease.Spec.HolderIdentity { + h.enqueueLease(newLease) + } }, DeleteFunc: func(obj interface{}) { - h.deleteSV(obj) - }}) - return h + if lease, ok := h.isValidPeerIdentityLease(obj); ok { + h.enqueueLease(lease) + } + }, + }) + if err != nil { + return nil, err + } + + h.leaseRegistration = peerDiscoveryRegistration + return h, nil +} + +func (h *peerProxyHandler) isValidPeerIdentityLease(obj interface{}) (*v1.Lease, bool) { + lease, ok := obj.(*v1.Lease) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return nil, false + } + if lease, ok = tombstone.Obj.(*v1.Lease); !ok { + utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) + return nil, false + } + } + + if lease == nil { + klog.Error(fmt.Errorf("nil lease object provided")) + return nil, false + } + + if h.identityLeaseLabelSelector != nil && h.identityLeaseLabelSelector.String() != "" { + identityLeaseLabel := strings.Split(h.identityLeaseLabelSelector.String(), "=") + if len(identityLeaseLabel) != 2 { + klog.Errorf("invalid identityLeaseLabelSelector format: %s", h.identityLeaseLabelSelector.String()) + return nil, false + } + + if lease.Labels == nil || lease.Labels[identityLeaseLabel[0]] != identityLeaseLabel[1] { + klog.V(4).Infof("lease %s/%s does not match label selector: %s=%s", lease.Namespace, lease.Name, identityLeaseLabel[0], identityLeaseLabel[1]) + return nil, false + } + + } + + // Ignore self. + if lease.Name == h.serverID { + return nil, false + } + + if lease.Spec.HolderIdentity == nil { + klog.Error(fmt.Errorf("invalid lease object provided, missing holderIdentity in lease obj")) + return nil, false + } + + return lease, true } diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go index c1a77791d7f..2894271346f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler.go @@ -25,26 +25,30 @@ import ( "net/http" "net/url" "strconv" - "strings" "sync" "sync/atomic" + "time" - "k8s.io/api/apiserverinternal/v1alpha1" - apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - schema "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/proxy" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" - epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" - apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/responsewriter" "k8s.io/apiserver/pkg/reconcilers" - "k8s.io/apiserver/pkg/storageversion" "k8s.io/apiserver/pkg/util/peerproxy/metrics" - apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" + "k8s.io/client-go/discovery" "k8s.io/client-go/tools/cache" "k8s.io/client-go/transport" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + schema "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy" + coordinationv1informers "k8s.io/client-go/informers/coordination/v1" ) const ( @@ -53,33 +57,34 @@ const ( type peerProxyHandler struct { name string - // StorageVersion informer used to fetch apiserver ids than can serve a resource - storageversionInformer cache.SharedIndexInformer - - // StorageVersion manager used to ensure it has finished updating storageversions before - // we start handling external requests - storageversionManager storageversion.Manager - - // proxy transport - proxyTransport http.RoundTripper - - // identity for this server - serverId string - - // reconciler that is used to fetch host port of peer apiserver when proxying request to a peer - reconciler reconcilers.PeerEndpointLeaseReconciler - - serializer runtime.NegotiatedSerializer - - // SyncMap for storing an up to date copy of the storageversions and apiservers that can serve them - // This map is populated using the StorageVersion informer - // This map has key set to GVR and value being another SyncMap - // The nested SyncMap has key set to apiserver id and value set to boolean - // The nested maps are created to have a "Set" like structure to store unique apiserver ids - // for a given GVR - svMap sync.Map - + // Identity for this server. + serverID string finishedSync atomic.Bool + // Label to check against in identity leases to make sure + // we are working with apiserver identity leases only. + identityLeaseLabelSelector labels.Selector + apiserverIdentityInformer coordinationv1informers.LeaseInformer + leaseRegistration cache.ResourceEventHandlerRegistration + // Reconciler that is used to fetch host port of peer apiserver when proxying request to a peer. + reconciler reconcilers.PeerEndpointLeaseReconciler + // Client to make discovery calls locally. + discoveryClient *discovery.DiscoveryClient + discoverySerializer serializer.CodecFactory + // Cache that stores resources served by this apiserver. Refreshed periodically. + // We always look up in the local discovery cache first, to check whether the + // request can be served by this apiserver instead of proxying it to a peer. + localDiscoveryInfoCache atomic.Value + localDiscoveryCacheTicker *time.Ticker + localDiscoveryInfoCachePopulated chan struct{} + localDiscoveryInfoCachePopulatedOnce sync.Once + // Cache that stores resources served by peer apiservers. + // Refreshed if a new apiserver identity lease is added, deleted or + // holderIndentity change is observed in the lease. + peerDiscoveryInfoCache atomic.Value + proxyTransport http.RoundTripper + // Worker queue that keeps the peerDiscoveryInfoCache up-to-date. + peerLeaseQueue workqueue.TypedRateLimitingInterface[string] + serializer runtime.NegotiatedSerializer } // responder implements rest.Responder for assisting a connector in writing objects or errors. @@ -93,12 +98,22 @@ func (h *peerProxyHandler) HasFinishedSync() bool { } func (h *peerProxyHandler) WaitForCacheSync(stopCh <-chan struct{}) error { - - ok := cache.WaitForNamedCacheSync("unknown-version-proxy", stopCh, h.storageversionInformer.HasSynced, h.storageversionManager.Completed) + ok := cache.WaitForNamedCacheSync("mixed-version-proxy", stopCh, h.apiserverIdentityInformer.Informer().HasSynced) if !ok { return fmt.Errorf("error while waiting for initial cache sync") } - klog.V(3).Infof("setting finishedSync to true") + + if !cache.WaitForNamedCacheSync(controllerName, stopCh, h.leaseRegistration.HasSynced) { + return fmt.Errorf("error while waiting for peer-identity-lease event handler registration sync") + } + + // Wait for localDiscoveryInfoCache to be populated. + select { + case <-h.localDiscoveryInfoCachePopulated: + case <-stopCh: + return fmt.Errorf("stop signal received while waiting for local discovery cache population") + } + h.finishedSync.Store(true) return nil } @@ -109,7 +124,6 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() requestInfo, ok := apirequest.RequestInfoFrom(ctx) - if !ok { responsewriters.InternalError(w, r, errors.New("no RequestInfo found in the context")) return @@ -129,10 +143,9 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { return } - // StorageVersion Informers and/or StorageVersionManager is not synced yet, pass request to next handler + // Apiserver Identity Informers is not synced yet, pass request to next handler // This will happen for self requests from the kube-apiserver because we have a poststarthook - // to ensure that external requests are not served until the StorageVersion Informer and - // StorageVersionManager has synced + // to ensure that external requests are not served until the ApiserverIdentity Informer has synced if !h.HasFinishedSync() { handler.ServeHTTP(w, r) return @@ -143,15 +156,20 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { gvr.Group = "core" } - apiservers, err := h.findServiceableByServers(gvr) - if err != nil { - // resource wasn't found in SV informer cache which means that resource is an aggregated API - // or a CR. This situation is ok to be handled by local handler. + if h.shouldServeLocally(gvr) { handler.ServeHTTP(w, r) return } - locallyServiceable, peerEndpoints, err := h.resolveServingLocation(apiservers) + // find servers that are capable of serving this request + peerServerIDs := h.findServiceableByPeerFromPeerDiscoveryCache(gvr) + if len(peerServerIDs) == 0 { + klog.Errorf("gvr %v is not served by anything in this cluster", gvr) + handler.ServeHTTP(w, r) + return + } + + peerEndpoints, err := h.resolveServingLocation(peerServerIDs) if err != nil { gv := schema.GroupVersion{Group: gvr.Group, Version: gvr.Version} klog.ErrorS(err, "error finding serviceable-by apiservers for the requested resource", "gvr", gvr) @@ -159,91 +177,145 @@ func (h *peerProxyHandler) WrapHandler(handler http.Handler) http.Handler { return } - // pass request to the next handler if found the gvr locally. - // TODO: maintain locally serviceable GVRs somewhere so that we dont have to - // consult the storageversion-informed map for those - if locallyServiceable { - handler.ServeHTTP(w, r) - return - } - - if len(peerEndpoints) == 0 { - klog.Errorf("gvr %v is not served by anything in this cluster", gvr) - handler.ServeHTTP(w, r) - return - } - - // otherwise, randomly select an apiserver and proxy request to it rand := rand.Intn(len(peerEndpoints)) - destServerHostPort := peerEndpoints[rand] - h.proxyRequestToDestinationAPIServer(r, w, destServerHostPort) + peerEndpoint := peerEndpoints[rand] + h.proxyRequestToDestinationAPIServer(r, w, peerEndpoint) }) } -func (h *peerProxyHandler) findServiceableByServers(gvr schema.GroupVersionResource) (*sync.Map, error) { - apiserversi, ok := h.svMap.Load(gvr) - if !ok || apiserversi == nil { - return nil, fmt.Errorf("no storageVersions found for the GVR: %v", gvr) +// RunLocalDiscoveryCacheSync populated the localDiscoveryInfoCache and +// starts a goroutine to periodically refresh the local discovery cache. +func (h *peerProxyHandler) RunLocalDiscoveryCacheSync(stopCh <-chan struct{}) error { + klog.Info("localDiscoveryCacheInvalidation goroutine started") + // Populate the cache initially. + if err := h.populateLocalDiscoveryCache(); err != nil { + return fmt.Errorf("failed to populate initial local discovery cache: %w", err) } - apiservers, _ := apiserversi.(*sync.Map) - return apiservers, nil + go func() { + for { + select { + case <-h.localDiscoveryCacheTicker.C: + klog.V(4).Infof("Invalidating local discovery cache") + if err := h.populateLocalDiscoveryCache(); err != nil { + klog.Errorf("Failed to repopulate local discovery cache: %v", err) + } + case <-stopCh: + klog.Info("localDiscoveryCacheInvalidation goroutine received stop signal") + if h.localDiscoveryCacheTicker != nil { + h.localDiscoveryCacheTicker.Stop() + klog.Info("localDiscoveryCacheTicker stopped") + } + klog.Info("localDiscoveryCacheInvalidation goroutine exiting") + return + } + } + }() + return nil } -func (h *peerProxyHandler) resolveServingLocation(apiservers *sync.Map) (bool, []string, error) { - var peerServerEndpoints []string - var locallyServiceable bool - var respErr error +func (h *peerProxyHandler) populateLocalDiscoveryCache() error { + _, resourcesByGV, _, err := h.discoveryClient.GroupsAndMaybeResources() + if err != nil { + return fmt.Errorf("error getting API group resources from discovery: %w", err) + } - apiservers.Range(func(key, value interface{}) bool { - apiserverKey := key.(string) - if apiserverKey == h.serverId { - locallyServiceable = true - // stop iteration and reset any errors encountered so far. - respErr = nil - return false + freshLocalDiscoveryResponse := map[schema.GroupVersionResource]bool{} + for gv, resources := range resourcesByGV { + if gv.Group == "" { + gv.Group = "core" + } + for _, resource := range resources.APIResources { + gvr := gv.WithResource(resource.Name) + freshLocalDiscoveryResponse[gvr] = true + } + } + + h.localDiscoveryInfoCache.Store(freshLocalDiscoveryResponse) + // Signal that the cache has been populated. + h.localDiscoveryInfoCachePopulatedOnce.Do(func() { + close(h.localDiscoveryInfoCachePopulated) + }) + return nil +} + +// shouldServeLocally checks if the requested resource is present in the local +// discovery cache indicating the request can be served by this server. +func (h *peerProxyHandler) shouldServeLocally(gvr schema.GroupVersionResource) bool { + cache := h.localDiscoveryInfoCache.Load().(map[schema.GroupVersionResource]bool) + exists, ok := cache[gvr] + if !ok { + klog.V(4).Infof("resource not found for %v in local discovery cache\n", gvr.GroupVersion()) + return false + } + + if exists { + return true + } + + return false +} + +func (h *peerProxyHandler) findServiceableByPeerFromPeerDiscoveryCache(gvr schema.GroupVersionResource) []string { + var serviceableByIDs []string + cache := h.peerDiscoveryInfoCache.Load().(map[string]map[schema.GroupVersionResource]bool) + for peerID, servedResources := range cache { + // Ignore local apiserver. + if peerID == h.serverID { + continue } - hostPort, err := h.hostportInfo(apiserverKey) + exists, ok := servedResources[gvr] + if !ok { + continue + } + + if exists { + serviceableByIDs = append(serviceableByIDs, peerID) + } + } + + return serviceableByIDs +} + +// resolveServingLocation resolves the host:port addresses for the given peer IDs. +func (h *peerProxyHandler) resolveServingLocation(peerIDs []string) ([]string, error) { + var peerServerEndpoints []string + var errs []error + + for _, id := range peerIDs { + hostPort, err := h.hostportInfo(id) if err != nil { - respErr = err - // continue with iteration - return true + errs = append(errs, err) + continue } peerServerEndpoints = append(peerServerEndpoints, hostPort) - return true - }) + } // reset err if there was atleast one valid peer server found. if len(peerServerEndpoints) > 0 { - respErr = nil + errs = nil } - return locallyServiceable, peerServerEndpoints, respErr + return peerServerEndpoints, errors.Join(errs...) } func (h *peerProxyHandler) hostportInfo(apiserverKey string) (string, error) { - hostport, err := h.reconciler.GetEndpoint(apiserverKey) - if err != nil { - return "", err - } - // check ip format - _, _, err = net.SplitHostPort(hostport) + hostPort, err := h.reconciler.GetEndpoint(apiserverKey) if err != nil { return "", err } - return hostport, nil + _, _, err = net.SplitHostPort(hostPort) + if err != nil { + return "", err + } + + return hostPort, nil } func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, rw http.ResponseWriter, host string) { - user, ok := apirequest.UserFrom(req.Context()) - if !ok { - klog.Error("failed to get user info from request") - return - } - // write a new location based on the existing request pointed at the target service location := &url.URL{} location.Scheme = "https" @@ -255,107 +327,29 @@ func (h *peerProxyHandler) proxyRequestToDestinationAPIServer(req *http.Request, newReq.Header.Add(PeerProxiedHeader, "true") defer cancelFn() - proxyRoundTripper := transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport) + proxyRoundTripper, err := h.buildProxyRoundtripper(req) + if err != nil { + klog.Errorf("failed to build proxy round tripper: %v", err) + return + } + delegate := &epmetrics.ResponseWriterDelegator{ResponseWriter: rw} w := responsewriter.WrapForHTTP1Or2(delegate) - handler := proxy.NewUpgradeAwareHandler(location, proxyRoundTripper, true, false, &responder{w: w, ctx: req.Context()}) handler.ServeHTTP(w, newReq) metrics.IncPeerProxiedRequest(req.Context(), strconv.Itoa(delegate.Status())) } +func (h *peerProxyHandler) buildProxyRoundtripper(req *http.Request) (http.RoundTripper, error) { + user, ok := apirequest.UserFrom(req.Context()) + if !ok { + return nil, apierrors.NewBadRequest("no user details present in request") + } + + return transport.NewAuthProxyRoundTripper(user.GetName(), user.GetUID(), user.GetGroups(), user.GetExtra(), h.proxyTransport), nil +} + func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { klog.ErrorS(err, "Error while proxying request to destination apiserver") http.Error(w, err.Error(), http.StatusServiceUnavailable) } - -// Adds a storageversion object to SVMap -func (h *peerProxyHandler) addSV(obj interface{}) { - sv, ok := obj.(*v1alpha1.StorageVersion) - if !ok { - klog.Error("Invalid StorageVersion provided to addSV()") - return - } - h.updateSVMap(nil, sv) -} - -// Updates the SVMap to delete old storageversion and add new storageversion -func (h *peerProxyHandler) updateSV(oldObj interface{}, newObj interface{}) { - oldSV, ok := oldObj.(*v1alpha1.StorageVersion) - if !ok { - klog.Error("Invalid StorageVersion provided to updateSV()") - return - } - - newSV, ok := newObj.(*v1alpha1.StorageVersion) - if !ok { - klog.Error("Invalid StorageVersion provided to updateSV()") - return - } - - h.updateSVMap(oldSV, newSV) -} - -// Deletes a storageversion object from SVMap -func (h *peerProxyHandler) deleteSV(obj interface{}) { - sv, ok := obj.(*v1alpha1.StorageVersion) - if !ok { - klog.Error("Invalid StorageVersion provided to deleteSV()") - return - } - - h.updateSVMap(sv, nil) -} - -// Delete old storageversion, add new storagversion -func (h *peerProxyHandler) updateSVMap(oldSV *v1alpha1.StorageVersion, newSV *v1alpha1.StorageVersion) { - if oldSV != nil { - h.deleteSVFromMap(oldSV) - } - - if newSV != nil { - h.addSVToMap(newSV) - } -} - -func (h *peerProxyHandler) deleteSVFromMap(sv *v1alpha1.StorageVersion) { - // The name of storageversion is . - splitInd := strings.LastIndex(sv.Name, ".") - group := sv.Name[:splitInd] - resource := sv.Name[splitInd+1:] - - gvr := schema.GroupVersionResource{Group: group, Resource: resource} - for _, gr := range sv.Status.StorageVersions { - for _, version := range gr.ServedVersions { - versionSplit := strings.Split(version, "/") - if len(versionSplit) == 2 { - version = versionSplit[1] - } - gvr.Version = version - h.svMap.Delete(gvr) - } - } -} - -func (h *peerProxyHandler) addSVToMap(sv *v1alpha1.StorageVersion) { - // The name of storageversion is . - splitInd := strings.LastIndex(sv.Name, ".") - group := sv.Name[:splitInd] - resource := sv.Name[splitInd+1:] - - gvr := schema.GroupVersionResource{Group: group, Resource: resource} - for _, gr := range sv.Status.StorageVersions { - for _, version := range gr.ServedVersions { - - // some versions have groups included in them, so get rid of the groups - versionSplit := strings.Split(version, "/") - if len(versionSplit) == 2 { - version = versionSplit[1] - } - gvr.Version = version - apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) - apiservers := apiserversi.(*sync.Map) - apiservers.Store(gr.APIServerID, true) - } - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go index 4fb45f30243..f313678d49d 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go +++ b/staging/src/k8s.io/apiserver/pkg/util/peerproxy/peerproxy_handler_test.go @@ -19,7 +19,6 @@ package peerproxy import ( "net/http" "strings" - "sync" "testing" "time" @@ -27,42 +26,39 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/apitesting" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/authentication/user" - apifilters "k8s.io/apiserver/pkg/endpoints/filters" - apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/reconcilers" - etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" - "k8s.io/apiserver/pkg/storageversion" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/peerproxy/metrics" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/rest" "k8s.io/client-go/transport" - featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/metrics/testutil" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + apifilters "k8s.io/apiserver/pkg/endpoints/filters" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" ) const ( - requestTimeout = 30 * time.Second - localServerID = "local-apiserver" - remoteServerID = "remote-apiserver" + requestTimeout = 30 * time.Second + localServerID = "local-apiserver" + remoteServerID1 = "remote-apiserver-1" + remoteServerID2 = "remote-apiserver-2" ) -type FakeSVMapData struct { - gvr schema.GroupVersionResource - serverIDs []string -} - type server struct { publicIP string serverID string @@ -76,205 +72,138 @@ type reconciler struct { func TestPeerProxy(t *testing.T) { testCases := []struct { desc string - svdata FakeSVMapData informerFinishedSync bool requestPath string peerproxiedHeader string - expectedStatus int - metrics []string - want string reconcilerConfig reconciler + localCache map[schema.GroupVersionResource]bool + peerCache map[string]map[schema.GroupVersionResource]bool + wantStatus int + wantMetricsData string }{ { - desc: "allow non resource requests", - requestPath: "/foo/bar/baz", - expectedStatus: http.StatusOK, + desc: "allow non resource requests", + requestPath: "/foo/bar/baz", + wantStatus: http.StatusOK, }, { desc: "allow if already proxied once", requestPath: "/api/bar/baz", - expectedStatus: http.StatusOK, peerproxiedHeader: "true", + wantStatus: http.StatusOK, }, { desc: "allow if unsynced informers", requestPath: "/api/bar/baz", - expectedStatus: http.StatusOK, informerFinishedSync: false, + wantStatus: http.StatusOK, }, { - desc: "allow if no storage version found", - requestPath: "/api/bar/baz", - expectedStatus: http.StatusOK, - informerFinishedSync: true, + desc: "Serve locally if serviceable", + requestPath: "/api/foo/bar", + localCache: map[schema.GroupVersionResource]bool{ + {Group: "core", Version: "foo", Resource: "bar"}: true, + }, + wantStatus: http.StatusOK, }, { - // since if no server id is found, we pass request to next handler - //, and our last handler in local chain is an http ok handler - desc: "200 if no serverid found", - requestPath: "/api/bar/baz", - expectedStatus: http.StatusOK, + desc: "200 if no appropriate peers found, serve locally", + requestPath: "/api/foo/bar", informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "bar", - Resource: "baz"}, - serverIDs: []string{}}, + wantStatus: http.StatusOK, }, { desc: "503 if no endpoint fetched from lease", requestPath: "/api/foo/bar", - expectedStatus: http.StatusServiceUnavailable, informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "foo", - Resource: "bar"}, - serverIDs: []string{remoteServerID}}, - }, - { - desc: "200 if locally serviceable", - requestPath: "/api/foo/bar", - expectedStatus: http.StatusOK, - informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "foo", - Resource: "bar"}, - serverIDs: []string{localServerID}}, + peerCache: map[string]map[schema.GroupVersionResource]bool{ + remoteServerID1: { + {Group: "core", Version: "foo", Resource: "bar"}: true, + }, + }, + wantStatus: http.StatusServiceUnavailable, }, { desc: "503 unreachable peer bind address", requestPath: "/api/foo/bar", - expectedStatus: http.StatusServiceUnavailable, informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "foo", - Resource: "bar"}, - serverIDs: []string{remoteServerID}}, + peerCache: map[string]map[schema.GroupVersionResource]bool{ + remoteServerID1: { + {Group: "core", Version: "foo", Resource: "bar"}: true, + }, + }, reconcilerConfig: reconciler{ do: true, servers: []server{ { publicIP: "1.2.3.4", - serverID: remoteServerID, + serverID: remoteServerID1, }, }, }, - metrics: []string{ - "apiserver_rerouted_request_total", - }, - want: ` - # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it - # TYPE apiserver_rerouted_request_total counter - apiserver_rerouted_request_total{code="503"} 1 - `, - }, - { - desc: "503 unreachable peer public address", - requestPath: "/api/foo/bar", - expectedStatus: http.StatusServiceUnavailable, - informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "foo", - Resource: "bar"}, - serverIDs: []string{remoteServerID}}, - reconcilerConfig: reconciler{ - do: true, - servers: []server{ - { - publicIP: "1.2.3.4", - serverID: remoteServerID, - }, - }, - }, - metrics: []string{ - "apiserver_rerouted_request_total", - }, - want: ` - # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it - # TYPE apiserver_rerouted_request_total counter - apiserver_rerouted_request_total{code="503"} 2 - `, + wantStatus: http.StatusServiceUnavailable, + wantMetricsData: ` + # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it + # TYPE apiserver_rerouted_request_total counter + apiserver_rerouted_request_total{code="503"} 1 + `, }, { desc: "503 if one apiserver's endpoint lease wasnt found but another valid (unreachable) apiserver was found", requestPath: "/api/foo/bar", - expectedStatus: http.StatusServiceUnavailable, informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "foo", - Resource: "bar"}, - serverIDs: []string{"aggregated-apiserver", remoteServerID}}, + peerCache: map[string]map[schema.GroupVersionResource]bool{ + remoteServerID1: { + {Group: "core", Version: "foo", Resource: "bar"}: true, + }, + remoteServerID2: { + {Group: "core", Version: "foo", Resource: "bar"}: true, + }, + }, reconcilerConfig: reconciler{ do: true, servers: []server{ { publicIP: "1.2.3.4", - serverID: remoteServerID, - }, - }, - }, - }, - { - desc: "503 if all peers had invalid host:port info", - requestPath: "/api/foo/bar", - expectedStatus: http.StatusServiceUnavailable, - informerFinishedSync: true, - svdata: FakeSVMapData{ - gvr: schema.GroupVersionResource{ - Group: "core", - Version: "foo", - Resource: "bar"}, - serverIDs: []string{"aggregated-apiserver", remoteServerID}}, - reconcilerConfig: reconciler{ - do: true, - servers: []server{ - { - publicIP: "1[2.4", - serverID: "aggregated-apiserver", - }, - { - publicIP: "2.4]6", - serverID: remoteServerID, + serverID: remoteServerID1, }, }, }, + wantStatus: http.StatusServiceUnavailable, + wantMetricsData: ` + # HELP apiserver_rerouted_request_total [ALPHA] Total number of requests that were proxied to a peer kube apiserver because the local apiserver was not capable of serving it + # TYPE apiserver_rerouted_request_total counter + apiserver_rerouted_request_total{code="503"} 1 + `, }, } metrics.Register() for _, tt := range testCases { t.Run(tt.desc, func(t *testing.T) { + defer metrics.Reset() lastHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("OK")) }) - reconciler := newFakePeerEndpointReconciler(t) - handler := newHandlerChain(t, lastHandler, reconciler, tt.informerFinishedSync, tt.svdata) + serverIDs := []string{localServerID} + for peerID := range tt.peerCache { + serverIDs = append(serverIDs, peerID) + } + fakeReconciler := newFakePeerEndpointReconciler(t) + handler := newHandlerChain(t, tt.informerFinishedSync, lastHandler, fakeReconciler, tt.localCache, tt.peerCache) server, requestGetter := createHTTP2ServerWithClient(handler, requestTimeout*2) defer server.Close() if tt.reconcilerConfig.do { - // need to enable feature flags first featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true) - for _, server := range tt.reconcilerConfig.servers { - err := reconciler.UpdateLease(server.serverID, - server.publicIP, + for _, s := range tt.reconcilerConfig.servers { + err := fakeReconciler.UpdateLease(s.serverID, + s.publicIP, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}) if err != nil { - t.Fatalf("failed to update peer endpoint lease - %v", err) + t.Errorf("Failed to update lease for server %s", s.serverID) } } } @@ -285,17 +214,14 @@ func TestPeerProxy(t *testing.T) { } req.Header.Set(PeerProxiedHeader, tt.peerproxiedHeader) - resp, err := requestGetter(req) - if err != nil { - t.Fatalf("unexpected error trying to get the request: %v", err) - } + resp, _ := requestGetter(req) // compare response - assert.Equal(t, tt.expectedStatus, resp.StatusCode) + assert.Equal(t, tt.wantStatus, resp.StatusCode) // compare metric - if tt.want != "" { - if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.want), tt.metrics...); err != nil { + if tt.wantMetricsData != "" { + if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, strings.NewReader(tt.wantMetricsData), []string{"apiserver_rerouted_request_total"}...); err != nil { t.Fatal(err) } } @@ -324,10 +250,12 @@ func newFakePeerEndpointReconciler(t *testing.T) reconcilers.PeerEndpointLeaseRe return reconciler } -func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers.PeerEndpointLeaseReconciler, informerFinishedSync bool, svdata FakeSVMapData) http.Handler { +func newHandlerChain(t *testing.T, informerFinishedSync bool, handler http.Handler, + reconciler reconcilers.PeerEndpointLeaseReconciler, + localCache map[schema.GroupVersionResource]bool, peerCache map[string]map[schema.GroupVersionResource]bool) http.Handler { // Add peerproxy handler s := serializer.NewCodecFactory(runtime.NewScheme()).WithoutConversion() - peerProxyHandler, err := newFakePeerProxyHandler(reconciler, svdata, localServerID, s) + peerProxyHandler, err := newFakePeerProxyHandler(informerFinishedSync, reconciler, localServerID, s, localCache, peerCache) if err != nil { t.Fatalf("Error creating peer proxy handler: %v", err) } @@ -343,36 +271,28 @@ func newHandlerChain(t *testing.T, handler http.Handler, reconciler reconcilers. return handler } -func newFakePeerProxyHandler(reconciler reconcilers.PeerEndpointLeaseReconciler, svdata FakeSVMapData, id string, s runtime.NegotiatedSerializer) (*peerProxyHandler, error) { +func newFakePeerProxyHandler(informerFinishedSync bool, + reconciler reconcilers.PeerEndpointLeaseReconciler, id string, s runtime.NegotiatedSerializer, + localCache map[schema.GroupVersionResource]bool, peerCache map[string]map[schema.GroupVersionResource]bool) (*peerProxyHandler, error) { clientset := fake.NewSimpleClientset() informerFactory := informers.NewSharedInformerFactory(clientset, 0) + leaseInformer := informerFactory.Coordination().V1().Leases() clientConfig := &transport.Config{ TLS: transport.TLSConfig{ Insecure: false, }} - proxyRoundTripper, err := transport.New(clientConfig) + loopbackClientConfig := &rest.Config{ + Host: "localhost:1010", + } + ppH, err := NewPeerProxyHandler(id, "identity=testserver", leaseInformer, reconciler, s, loopbackClientConfig, clientConfig) if err != nil { return nil, err } - ppI := NewPeerProxyHandler(informerFactory, storageversion.NewDefaultManager(), proxyRoundTripper, id, reconciler, s) - if testDataExists(svdata.gvr) { - ppI.addToStorageVersionMap(svdata.gvr, svdata.serverIDs) - } - return ppI, nil -} + ppH.localDiscoveryInfoCache.Store(localCache) + ppH.peerDiscoveryInfoCache.Store(peerCache) -func (h *peerProxyHandler) addToStorageVersionMap(gvr schema.GroupVersionResource, serverIDs []string) { - apiserversi, _ := h.svMap.LoadOrStore(gvr, &sync.Map{}) - apiservers := apiserversi.(*sync.Map) - for _, serverID := range serverIDs { - if serverID != "" { - apiservers.Store(serverID, true) - } - } -} - -func testDataExists(gvr schema.GroupVersionResource) bool { - return gvr.Group != "" && gvr.Version != "" && gvr.Resource != "" + ppH.finishedSync.Store(informerFinishedSync) + return ppH, nil } func withFakeUser(handler http.Handler) http.Handler { diff --git a/test/integration/apiserver/peerproxy/peer_proxy_test.go b/test/integration/apiserver/peerproxy/peer_proxy_test.go index 705489a44bf..522ed49d66e 100644 --- a/test/integration/apiserver/peerproxy/peer_proxy_test.go +++ b/test/integration/apiserver/peerproxy/peer_proxy_test.go @@ -31,14 +31,12 @@ import ( "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/transport" "k8s.io/client-go/util/cert" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" - "k8s.io/kubernetes/pkg/controller/storageversiongc" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" kubefeatures "k8s.io/kubernetes/pkg/features" @@ -48,7 +46,6 @@ import ( ) func TestPeerProxiedRequest(t *testing.T) { - ktesting.SetDefaultVerbosity(1) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer func() { @@ -60,7 +57,6 @@ func TestPeerProxiedRequest(t *testing.T) { // enable feature flags featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true) // create sharedetcd @@ -111,7 +107,6 @@ func TestPeerProxiedRequest(t *testing.T) { } func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { - ktesting.SetDefaultVerbosity(1) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer func() { @@ -123,7 +118,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { // enable feature flags featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true) - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true) // create sharedetcd @@ -133,8 +127,7 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { proxyCA, err := createProxyCertContent() require.NoError(t, err) - // set lease duration to 1s for serverA to ensure that storageversions for serverA are updated - // once it is shutdown + // modify lease parameters so that they are garbage collected timely. controlplaneapiserver.IdentityLeaseDurationSeconds = 10 controlplaneapiserver.IdentityLeaseGCPeriod = 2 * time.Second controlplaneapiserver.IdentityLeaseRenewIntervalPeriod = time.Second @@ -146,10 +139,7 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{"--runtime-config=api/all=true"}, etcd) kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) require.NoError(t, err) - // ensure storageversion garbage collector ctlr is set up - informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second) - informersACtx, informersACancel := context.WithCancel(ctx) - setupStorageVersionGC(informersACtx, kubeClientSetA, informersA) + // reset lease duration to default value for serverB and serverC since we will not be // shutting these down controlplaneapiserver.IdentityLeaseDurationSeconds = 3600 @@ -163,9 +153,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { t.Cleanup(serverB.TearDownFn) kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) require.NoError(t, err) - // ensure storageversion garbage collector ctlr is set up - informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second) - setupStorageVersionGC(ctx, kubeClientSetB, informersB) // start serverC with all APIs enabled // override hostname to ensure unique ips @@ -181,7 +168,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { klog.Infof("\nServerA has created jobs\n") // shutdown serverA - informersACancel() serverA.TearDownFn() var jobsB *v1.JobList @@ -212,16 +198,6 @@ func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { assert.Equal(t, job.Name, jobsB.Items[0].Name) } -func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) { - leaseInformer := informers.Coordination().V1().Leases() - storageVersionInformer := informers.Internal().V1alpha1().StorageVersions() - go leaseInformer.Informer().Run(ctx.Done()) - go storageVersionInformer.Informer().Run(ctx.Done()) - - controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer) - go controller.Run(ctx) -} - func createProxyCertContent() (kastesting.ProxyCA, error) { result := kastesting.ProxyCA{} proxySigningKey, err := testutil.NewPrivateKey() diff --git a/test/integration/controlplane/apiserver_identity_test.go b/test/integration/controlplane/apiserver_identity_test.go index 8043250f7f6..5abddd6fb57 100644 --- a/test/integration/controlplane/apiserver_identity_test.go +++ b/test/integration/controlplane/apiserver_identity_test.go @@ -38,7 +38,6 @@ import ( "k8s.io/client-go/kubernetes" featuregatetesting "k8s.io/component-base/featuregate/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" - "k8s.io/kubernetes/pkg/controlplane" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" @@ -85,7 +84,7 @@ func TestCreateLeaseOnStart(t *testing.T) { leases, err := kubeclient. CoordinationV1(). Leases(metav1.NamespaceSystem). - List(context.TODO(), metav1.ListOptions{LabelSelector: controlplaneapiserver.IdentityLeaseComponentLabelKey + "=" + controlplane.KubeAPIServer}) + List(context.TODO(), metav1.ListOptions{LabelSelector: controlplaneapiserver.IdentityLeaseComponentLabelKey + "=" + controlplaneapiserver.KubeAPIServer}) if err != nil { return false, err } @@ -207,7 +206,7 @@ func newTestLease(acquireTime time.Time, namespace string) *coordinationv1.Lease Name: testLeaseName, Namespace: namespace, Labels: map[string]string{ - controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer, + controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplaneapiserver.KubeAPIServer, }, }, Spec: coordinationv1.LeaseSpec{ diff --git a/test/integration/storageversion/gc_test.go b/test/integration/storageversion/gc_test.go index af521415714..e5173520b1f 100644 --- a/test/integration/storageversion/gc_test.go +++ b/test/integration/storageversion/gc_test.go @@ -36,7 +36,6 @@ import ( "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/storageversiongc" - "k8s.io/kubernetes/pkg/controlplane" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" @@ -177,7 +176,7 @@ func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, Name: name, Namespace: metav1.NamespaceSystem, Labels: map[string]string{ - controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer, + controlplaneapiserver.IdentityLeaseComponentLabelKey: controlplaneapiserver.KubeAPIServer, }, }, Spec: coordinationv1.LeaseSpec{