From c5095069a8285d9a7e2de26c16a77250262d6e1b Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 17 Jul 2024 10:19:49 +0200 Subject: [PATCH 1/5] aggregator: separate out status controller metrics Signed-off-by: Dr. Stefan Schimanski --- .../pkg/apiserver/apiserver.go | 15 ++++++ .../status/available_controller.go | 48 +++---------------- .../status/available_controller_test.go | 7 +-- .../status/{ => metrics}/metrics.go | 39 ++++++++++++--- .../status/{ => metrics}/metrics_test.go | 2 +- 5 files changed, 59 insertions(+), 52 deletions(-) rename staging/src/k8s.io/kube-aggregator/pkg/controllers/status/{ => metrics}/metrics.go (72%) rename staging/src/k8s.io/kube-aggregator/pkg/controllers/status/{ => metrics}/metrics_test.go (98%) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 525597c3ad0..18ce7e95345 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -37,6 +38,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" "k8s.io/client-go/transport" + "k8s.io/component-base/metrics/legacyregistry" "k8s.io/component-base/tracing" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" @@ -50,10 +52,14 @@ import ( openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3" openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator" statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" + availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" openapicommon "k8s.io/kube-openapi/pkg/common" ) +// making sure we only register metrics once into legacy registry +var registerIntoLegacyRegistryOnce sync.Once + func init() { // we need to add the options (like ListOptions) to empty v1 metav1.AddToGroupVersion(aggregatorscheme.Scheme, schema.GroupVersion{Group: "", Version: "v1"}) @@ -314,6 +320,14 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg }) } + // create shared (remote and local) availability metrics + // TODO: decouple from legacyregistry + metrics := availabilitymetrics.New() + registerIntoLegacyRegistryOnce.Do(func() { err = metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister) }) + if err != nil { + return nil, err + } + // If the AvailableConditionController is disabled, we don't need to start the informers // and the controller. if !c.ExtraConfig.DisableAvailableConditionController { @@ -325,6 +339,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg proxyTransportDial, (func() ([]byte, []byte))(s.proxyCurrentCertKeyContent), s.serviceResolver, + metrics, ) if err != nil { return nil, err diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 14cd194aec4..62fcae064fa 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -39,7 +39,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/transport" "k8s.io/client-go/util/workqueue" - "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" @@ -47,11 +46,9 @@ import ( informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" "k8s.io/kube-aggregator/pkg/controllers" + availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" ) -// making sure we only register metrics once into legacy registry -var registerIntoLegacyRegistryOnce sync.Once - type certKeyFunc func() ([]byte, []byte) // ServiceResolver knows how to convert a service reference into an actual location. @@ -88,7 +85,7 @@ type AvailableConditionController struct { cacheLock sync.RWMutex // metrics registered into legacy registry - metrics *availabilityMetrics + metrics *availabilitymetrics.Metrics } // NewAvailableConditionController returns a new AvailableConditionController. @@ -100,6 +97,7 @@ func NewAvailableConditionController( proxyTransportDial *transport.DialHolder, proxyCurrentCertKeyContent certKeyFunc, serviceResolver ServiceResolver, + metrics *availabilitymetrics.Metrics, ) (*AvailableConditionController, error) { c := &AvailableConditionController{ apiServiceClient: apiServiceClient, @@ -116,7 +114,7 @@ func NewAvailableConditionController( ), proxyTransportDial: proxyTransportDial, proxyCurrentCertKeyContent: proxyCurrentCertKeyContent, - metrics: newAvailabilityMetrics(), + metrics: metrics, } // resync on this one because it is low cardinality and rechecking the actual discovery @@ -148,15 +146,6 @@ func NewAvailableConditionController( c.syncFn = c.sync - // TODO: decouple from legacyregistry - var err error - registerIntoLegacyRegistryOnce.Do(func() { - err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister) - }) - if err != nil { - return nil, err - } - return c, nil } @@ -385,7 +374,7 @@ func (c *AvailableConditionController) sync(key string) error { // apiservices. Doing that means we don't want to quickly issue no-op updates. func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) { // update this metric on every sync operation to reflect the actual state - c.setUnavailableGauge(newAPIService) + c.metrics.SetUnavailableGauge(newAPIService) if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) { return newAPIService, nil @@ -412,7 +401,7 @@ func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService return nil, err } - c.setUnavailableCounter(originalAPIService, newAPIService) + c.metrics.SetUnavailableCounter(originalAPIService, newAPIService) return newAPIService, nil } @@ -599,28 +588,3 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) { c.queue.Add(apiService) } } - -// setUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service -func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) { - if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) { - c.metrics.SetAPIServiceAvailable(newAPIService.Name) - return - } - - c.metrics.SetAPIServiceUnavailable(newAPIService.Name) -} - -// setUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed -func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) { - wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available) - isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) - statusChanged := isAvailable != wasAvailable - - if statusChanged && !isAvailable { - reason := "UnknownReason" - if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil { - reason = newCondition.Reason - } - c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc() - } -} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go index d95005b0116..321578df226 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go @@ -25,6 +25,7 @@ import ( "testing" "time" + availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" "k8s.io/utils/pointer" v1 "k8s.io/api/core/v1" @@ -133,7 +134,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second), workqueue.TypedRateLimitingQueueConfig[string]{Name: "AvailableConditionController"}, ), - metrics: newAvailabilityMetrics(), + metrics: availabilitymetrics.New(), } for _, svc := range apiServices { c.addAPIService(svc) @@ -395,7 +396,7 @@ func TestSync(t *testing.T) { endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer), serviceResolver: &fakeServiceResolver{url: testServer.URL}, proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, - metrics: newAvailabilityMetrics(), + metrics: availabilitymetrics.New(), } c.sync(tc.apiServiceName) @@ -447,7 +448,7 @@ func TestUpdateAPIServiceStatus(t *testing.T) { fakeClient := fake.NewSimpleClientset() c := AvailableConditionController{ apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), - metrics: newAvailabilityMetrics(), + metrics: availabilitymetrics.New(), } c.updateAPIServiceStatus(foo, foo) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics.go similarity index 72% rename from staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go rename to staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics.go index b0653f5988b..ebde3675976 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics.go @@ -14,12 +14,14 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package metrics import ( "sync" "k8s.io/component-base/metrics" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" ) /* @@ -41,14 +43,14 @@ var ( ) ) -type availabilityMetrics struct { +type Metrics struct { unavailableCounter *metrics.CounterVec *availabilityCollector } -func newAvailabilityMetrics() *availabilityMetrics { - return &availabilityMetrics{ +func New() *Metrics { + return &Metrics{ unavailableCounter: metrics.NewCounterVec( &metrics.CounterOpts{ Name: "aggregator_unavailable_apiservice_total", @@ -62,7 +64,7 @@ func newAvailabilityMetrics() *availabilityMetrics { } // Register registers apiservice availability metrics. -func (m *availabilityMetrics) Register( +func (m *Metrics) Register( registrationFunc func(metrics.Registerable) error, customRegistrationFunc func(metrics.StableCollector) error, ) error { @@ -80,7 +82,7 @@ func (m *availabilityMetrics) Register( } // UnavailableCounter returns a counter to track apiservices marked as unavailable. -func (m *availabilityMetrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric { +func (m *Metrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric { return m.unavailableCounter.WithLabelValues(apiServiceName, reason) } @@ -91,6 +93,31 @@ type availabilityCollector struct { availabilities map[string]bool } +// SetUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service +func (m *Metrics) SetUnavailableGauge(newAPIService *apiregistrationv1.APIService) { + if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) { + m.SetAPIServiceAvailable(newAPIService.Name) + return + } + + m.SetAPIServiceUnavailable(newAPIService.Name) +} + +// SetUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed +func (m *Metrics) SetUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) { + wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available) + isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) + statusChanged := isAvailable != wasAvailable + + if statusChanged && !isAvailable { + reason := "UnknownReason" + if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil { + reason = newCondition.Reason + } + m.UnavailableCounter(newAPIService.Name, reason).Inc() + } +} + // Check if apiServiceStatusCollector implements necessary interface. var _ metrics.StableCollector = &availabilityCollector{} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics_test.go similarity index 98% rename from staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go rename to staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics_test.go index 8205bb010c2..fc8876bb92b 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/metrics/metrics_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package metrics import ( "strings" From b5759ad4f9fcd80b21ce3b8107f2b5daf4e6c603 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 21 Jul 2024 10:53:47 +0200 Subject: [PATCH 2/5] aggregator: (pre-)move availability controller Signed-off-by: Dr. Stefan Schimanski --- staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go | 2 +- .../remote_available_controller.go} | 2 +- .../remote_available_controller_test.go} | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename staging/src/k8s.io/kube-aggregator/pkg/controllers/status/{available_controller.go => remote/remote_available_controller.go} (99%) rename staging/src/k8s.io/kube-aggregator/pkg/controllers/status/{available_controller_test.go => remote/remote_available_controller_test.go} (99%) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 18ce7e95345..82d67bf80d1 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -51,8 +51,8 @@ import ( openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3" openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator" - statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status" availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" + statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status/remote" apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" openapicommon "k8s.io/kube-openapi/pkg/common" ) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go similarity index 99% rename from staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go rename to staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go index 62fcae064fa..baced64a369 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package remote import ( "context" diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go similarity index 99% rename from staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go rename to staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go index 321578df226..edd330cfe65 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package apiserver +package remote import ( "fmt" From bbdc247406aa21d16644828771b377c042bfdeb6 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 21 Jul 2024 13:48:27 +0200 Subject: [PATCH 3/5] aggregator: make linter happy Signed-off-by: Dr. Stefan Schimanski --- .../remote/remote_available_controller.go | 4 +- .../remote_available_controller_test.go | 82 ++++++++++++------- 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go index baced64a369..146c6a51426 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go @@ -313,7 +313,7 @@ func (c *AvailableConditionController) sync(key string) error { resp.Body.Close() // we should always been in the 200s or 300s if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices { - errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode) + errCh <- fmt.Errorf("bad status from %v: %d", discoveryURL, resp.StatusCode) return } } @@ -324,7 +324,7 @@ func (c *AvailableConditionController) sync(key string) error { select { case err = <-errCh: if err != nil { - results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err) + results <- fmt.Errorf("failing or missing response from %v: %w", discoveryURL, err) return } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go index edd330cfe65..d08c9ae1784 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go @@ -25,11 +25,9 @@ import ( "testing" "time" - availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" - "k8s.io/utils/pointer" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/dump" v1listers "k8s.io/client-go/listers/core/v1" clienttesting "k8s.io/client-go/testing" @@ -39,11 +37,13 @@ import ( "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake" apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" + availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" + "k8s.io/utils/ptr" ) const ( - testServicePort = 1234 - testServicePortName = "testPort" + testServicePort int32 = 1234 + testServicePortName = "testPort" ) func newEndpoints(namespace, name string) *v1.Endpoints { @@ -100,13 +100,18 @@ func newRemoteAPIService(name string) *apiregistration.APIService { Service: &apiregistration.ServiceReference{ Namespace: "foo", Name: "bar", - Port: pointer.Int32Ptr(testServicePort), + Port: ptr.To(testServicePort), }, }, } } -func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableConditionController, *fake.Clientset) { +type T interface { + Fatalf(format string, args ...interface{}) + Errorf(format string, args ...interface{}) +} + +func setupAPIServices(t T, apiServices []runtime.Object) (*AvailableConditionController, *fake.Clientset) { fakeClient := fake.NewSimpleClientset() apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) @@ -118,7 +123,9 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond defer testServer.Close() for _, o := range apiServices { - apiServiceIndexer.Add(o) + if err := apiServiceIndexer.Add(o); err != nil { + t.Fatalf("failed to add APIService: %v", err) + } } c := AvailableConditionController{ @@ -145,7 +152,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond func BenchmarkBuildCache(b *testing.B) { apiServiceName := "remote.group" // model 1 APIService pointing at a given service, and 30 pointing at local group/versions - apiServices := []*apiregistration.APIService{newRemoteAPIService(apiServiceName)} + apiServices := []runtime.Object{newRemoteAPIService(apiServiceName)} for i := 0; i < 30; i++ { apiServices = append(apiServices, newLocalAPIService(fmt.Sprintf("local.group%d", i))) } @@ -154,7 +161,7 @@ func BenchmarkBuildCache(b *testing.B) { for i := 0; i < 100; i++ { services = append(services, newService("foo", fmt.Sprintf("bar%d", i), testServicePort, testServicePortName)) } - c, _ := setupAPIServices(apiServices) + c, _ := setupAPIServices(b, apiServices) b.ReportAllocs() b.ResetTimer() for n := 1; n <= b.N; n++ { @@ -175,7 +182,7 @@ func TestBuildCache(t *testing.T) { name string apiServiceName string - apiServices []*apiregistration.APIService + apiServices []runtime.Object services []*v1.Service endpoints []*v1.Endpoints @@ -184,13 +191,13 @@ func TestBuildCache(t *testing.T) { { name: "api service", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - c, fakeClient := setupAPIServices(tc.apiServices) + c, fakeClient := setupAPIServices(t, tc.apiServices) for _, svc := range tc.services { c.addService(svc) } @@ -210,18 +217,19 @@ func TestSync(t *testing.T) { name string apiServiceName string - apiServices []*apiregistration.APIService + apiServices []runtime.Object services []*v1.Service endpoints []*v1.Endpoints backendStatus int backendLocation string expectedAvailability apiregistration.APIServiceCondition + expectedSyncError string }{ { name: "local", apiServiceName: "local.group", - apiServices: []*apiregistration.APIService{newLocalAPIService("local.group")}, + apiServices: []runtime.Object{newLocalAPIService("local.group")}, backendStatus: http.StatusOK, expectedAvailability: apiregistration.APIServiceCondition{ Type: apiregistration.Available, @@ -233,7 +241,7 @@ func TestSync(t *testing.T) { { name: "no service", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "not-bar", testServicePort, testServicePortName)}, backendStatus: http.StatusOK, expectedAvailability: apiregistration.APIServiceCondition{ @@ -246,7 +254,7 @@ func TestSync(t *testing.T) { { name: "service on bad port", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{{ ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"}, Spec: v1.ServiceSpec{ @@ -268,7 +276,7 @@ func TestSync(t *testing.T) { { name: "no endpoints", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, backendStatus: http.StatusOK, expectedAvailability: apiregistration.APIServiceCondition{ @@ -281,7 +289,7 @@ func TestSync(t *testing.T) { { name: "missing endpoints", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, endpoints: []*v1.Endpoints{newEndpoints("foo", "bar")}, backendStatus: http.StatusOK, @@ -295,7 +303,7 @@ func TestSync(t *testing.T) { { name: "wrong endpoint port name", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, "wrongName")}, backendStatus: http.StatusOK, @@ -309,7 +317,7 @@ func TestSync(t *testing.T) { { name: "remote", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)}, backendStatus: http.StatusOK, @@ -323,7 +331,7 @@ func TestSync(t *testing.T) { { name: "remote-bad-return", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)}, backendStatus: http.StatusForbidden, @@ -333,11 +341,12 @@ func TestSync(t *testing.T) { Reason: "FailedDiscoveryCheck", Message: `failing or missing response from`, }, + expectedSyncError: "failing or missing response from", }, { name: "remote-redirect", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)}, backendStatus: http.StatusFound, @@ -348,11 +357,12 @@ func TestSync(t *testing.T) { Reason: "FailedDiscoveryCheck", Message: `failing or missing response from`, }, + expectedSyncError: "failing or missing response from", }, { name: "remote-304", apiServiceName: "remote.group", - apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")}, + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)}, endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)}, backendStatus: http.StatusNotModified, @@ -362,12 +372,13 @@ func TestSync(t *testing.T) { Reason: "FailedDiscoveryCheck", Message: `failing or missing response from`, }, + expectedSyncError: "failing or missing response from", }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - fakeClient := fake.NewSimpleClientset() + fakeClient := fake.NewSimpleClientset(tc.apiServices...) apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) @@ -398,7 +409,16 @@ func TestSync(t *testing.T) { proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() }, metrics: availabilitymetrics.New(), } - c.sync(tc.apiServiceName) + err := c.sync(tc.apiServiceName) + if tc.expectedSyncError != "" { + if err == nil { + t.Fatalf("%v expected error with %q, got none", tc.name, tc.expectedSyncError) + } else if !strings.Contains(err.Error(), tc.expectedSyncError) { + t.Fatalf("%v expected error with %q, got %q", tc.name, tc.expectedSyncError, err.Error()) + } + } else if err != nil { + t.Fatalf("%v unexpected sync error: %v", tc.name, err) + } // ought to have one action writing status if e, a := 1, len(fakeClient.Actions()); e != a { @@ -445,19 +465,23 @@ func TestUpdateAPIServiceStatus(t *testing.T) { foo := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "foo"}}}} bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}} - fakeClient := fake.NewSimpleClientset() + fakeClient := fake.NewSimpleClientset(foo) c := AvailableConditionController{ apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), metrics: availabilitymetrics.New(), } - c.updateAPIServiceStatus(foo, foo) + if _, err := c.updateAPIServiceStatus(foo, foo); err != nil { + t.Fatalf("unexpected error: %v", err) + } if e, a := 0, len(fakeClient.Actions()); e != a { t.Error(dump.Pretty(fakeClient.Actions())) } fakeClient.ClearActions() - c.updateAPIServiceStatus(foo, bar) + if _, err := c.updateAPIServiceStatus(foo, bar); err != nil { + t.Fatalf("unexpected error: %v", err) + } if e, a := 1, len(fakeClient.Actions()); e != a { t.Error(dump.Pretty(fakeClient.Actions())) } From 834cd7ca4a1c08b5d32d5e2da377310764f2c11c Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 17 Jul 2024 10:50:28 +0200 Subject: [PATCH 4/5] aggregator: split availability controller into local and remote part Signed-off-by: Dr. Stefan Schimanski --- .../pkg/apiserver/apiserver.go | 54 +++-- .../local/local_available_controller.go | 227 ++++++++++++++++++ .../local/local_available_controller_test.go | 168 +++++++++++++ .../remote/remote_available_controller.go | 28 +-- .../remote_available_controller_test.go | 9 + test/e2e/apimachinery/health_handlers.go | 9 +- 6 files changed, 457 insertions(+), 38 deletions(-) create mode 100644 staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller.go create mode 100644 staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller_test.go diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 82d67bf80d1..ac91fcf6ea0 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -51,8 +51,9 @@ import ( openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator" openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3" openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator" + localavailability "k8s.io/kube-aggregator/pkg/controllers/status/local" availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" - statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status/remote" + remoteavailability "k8s.io/kube-aggregator/pkg/controllers/status/remote" apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest" openapicommon "k8s.io/kube-openapi/pkg/common" ) @@ -102,12 +103,11 @@ type ExtraConfig struct { RejectForwardingRedirects bool - // DisableAvailableConditionController disables the controller that updates the Available conditions for - // APIServices, Endpoints and Services. This controller runs in kube-aggregator and can interfere with - // Generic Control Plane components when certain apis are not available. - // TODO: We should find a better way to handle this. For now it will be for Generic Control Plane authors to - // disable this controller if they see issues. - DisableAvailableConditionController bool + // DisableRemoteAvailableConditionController disables the controller that updates the Available conditions for + // remote APIServices via querying endpoints of the referenced services. In generic controlplane use-cases, + // the concept of services and endpoints might differ, and might require another implementation of this + // controller. Local APIService are reconciled nevertheless. + DisableRemoteAvailableConditionController bool } // Config represents the configuration needed to create an APIAggregator. @@ -320,6 +320,12 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg }) } + s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { + informerFactory.Start(context.Done()) + c.GenericConfig.SharedInformerFactory.Start(context.Done()) + return nil + }) + // create shared (remote and local) availability metrics // TODO: decouple from legacyregistry metrics := availabilitymetrics.New() @@ -328,10 +334,25 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil, err } - // If the AvailableConditionController is disabled, we don't need to start the informers - // and the controller. - if !c.ExtraConfig.DisableAvailableConditionController { - availableController, err := statuscontrollers.NewAvailableConditionController( + // always run local availability controller + local, err := localavailability.New( + informerFactory.Apiregistration().V1().APIServices(), + apiregistrationClient.ApiregistrationV1(), + metrics, + ) + if err != nil { + return nil, err + } + s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-local-available-controller", func(context genericapiserver.PostStartHookContext) error { + // if we end up blocking for long periods of time, we may need to increase workers. + go local.Run(5, context.Done()) + return nil + }) + + // conditionally run remote availability controller. This could be replaced in certain + // generic controlplane use-cases where there is another concept of services and/or endpoints. + if !c.ExtraConfig.DisableRemoteAvailableConditionController { + remote, err := remoteavailability.New( informerFactory.Apiregistration().V1().APIServices(), c.GenericConfig.SharedInformerFactory.Core().V1().Services(), c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(), @@ -344,16 +365,9 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg if err != nil { return nil, err } - - s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error { - informerFactory.Start(context.Done()) - c.GenericConfig.SharedInformerFactory.Start(context.Done()) - return nil - }) - - s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error { + s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-remote-available-controller", func(context genericapiserver.PostStartHookContext) error { // if we end up blocking for long periods of time, we may need to increase workers. - go availableController.Run(5, context.Done()) + go remote.Run(5, context.Done()) return nil }) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller.go new file mode 100644 index 00000000000..982ff69feb5 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller.go @@ -0,0 +1,227 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package external + +import ( + "context" + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper" + apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" + informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" + listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/controllers" + availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" +) + +// AvailableConditionController handles checking the availability of registered local API services. +type AvailableConditionController struct { + apiServiceClient apiregistrationclient.APIServicesGetter + + apiServiceLister listers.APIServiceLister + apiServiceSynced cache.InformerSynced + + // To allow injection for testing. + syncFn func(key string) error + + queue workqueue.TypedRateLimitingInterface[string] + + // metrics registered into legacy registry + metrics *availabilitymetrics.Metrics +} + +// New returns a new local availability AvailableConditionController. +func New( + apiServiceInformer informers.APIServiceInformer, + apiServiceClient apiregistrationclient.APIServicesGetter, + metrics *availabilitymetrics.Metrics, +) (*AvailableConditionController, error) { + c := &AvailableConditionController{ + apiServiceClient: apiServiceClient, + apiServiceLister: apiServiceInformer.Lister(), + queue: workqueue.NewTypedRateLimitingQueueWithConfig( + // We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the + // service network, it is possible for an external, non-watchable factor to affect availability. This keeps + // the maximum disruption time to a minimum, but it does prevent hot loops. + workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second), + workqueue.TypedRateLimitingQueueConfig[string]{Name: "LocalAvailabilityController"}, + ), + metrics: metrics, + } + + // resync on this one because it is low cardinality and rechecking the actual discovery + // allows us to detect health in a more timely fashion when network connectivity to + // nodes is snipped, but the network still attempts to route there. See + // https://github.com/openshift/origin/issues/17159#issuecomment-341798063 + apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addAPIService, + UpdateFunc: c.updateAPIService, + DeleteFunc: c.deleteAPIService, + }, + 30*time.Second) + c.apiServiceSynced = apiServiceHandler.HasSynced + + c.syncFn = c.sync + + return c, nil +} + +func (c *AvailableConditionController) sync(key string) error { + originalAPIService, err := c.apiServiceLister.Get(key) + if apierrors.IsNotFound(err) { + c.metrics.ForgetAPIService(key) + return nil + } + if err != nil { + return err + } + + if originalAPIService.Spec.Service != nil { + // this controller only handles local APIServices + return nil + } + + // local API services are always considered available + apiService := originalAPIService.DeepCopy() + apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition()) + _, err = c.updateAPIServiceStatus(originalAPIService, apiService) + return err +} + +// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead +// apiservices. Doing that means we don't want to quickly issue no-op updates. +func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) { + // update this metric on every sync operation to reflect the actual state + c.metrics.SetUnavailableGauge(newAPIService) + + if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) { + return newAPIService, nil + } + + orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available) + now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available) + unknown := apiregistrationv1.APIServiceCondition{ + Type: apiregistrationv1.Available, + Status: apiregistrationv1.ConditionUnknown, + } + if orig == nil { + orig = &unknown + } + if now == nil { + now = &unknown + } + if *orig != *now { + klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason) + } + + newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{}) + if err != nil { + return nil, err + } + + c.metrics.SetUnavailableCounter(originalAPIService, newAPIService) + return newAPIService, nil +} + +// Run starts the AvailableConditionController loop which manages the availability condition of API services. +func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Info("Starting LocalAvailability controller") + defer klog.Info("Shutting down LocalAvailability controller") + + // This waits not just for the informers to sync, but for our handlers + // to be called; since the handlers are three different ways of + // enqueueing the same thing, waiting for this permits the queue to + // maximally de-duplicate the entries. + if !controllers.WaitForCacheSync("LocalAvailability", stopCh, c.apiServiceSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh +} + +func (c *AvailableConditionController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *AvailableConditionController) processNextWorkItem() bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + err := c.syncFn(key) + if err == nil { + c.queue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with: %w", key, err)) + c.queue.AddRateLimited(key) + + return true +} + +func (c *AvailableConditionController) addAPIService(obj interface{}) { + castObj := obj.(*apiregistrationv1.APIService) + klog.V(4).Infof("Adding %s", castObj.Name) + c.queue.Add(castObj.Name) +} + +func (c *AvailableConditionController) updateAPIService(oldObj, _ interface{}) { + oldCastObj := oldObj.(*apiregistrationv1.APIService) + klog.V(4).Infof("Updating %s", oldCastObj.Name) + c.queue.Add(oldCastObj.Name) +} + +func (c *AvailableConditionController) deleteAPIService(obj interface{}) { + castObj, ok := obj.(*apiregistrationv1.APIService) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + klog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService) + if !ok { + klog.Errorf("Tombstone contained object that is not expected %#v", obj) + return + } + } + klog.V(4).Infof("Deleting %q", castObj.Name) + c.queue.Add(castObj.Name) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller_test.go new file mode 100644 index 00000000000..a64c1d7d51c --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/local/local_available_controller_test.go @@ -0,0 +1,168 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package external + +import ( + "strings" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/dump" + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake" + apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" + listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" + availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics" + "k8s.io/utils/ptr" +) + +const ( + testServicePort int32 = 1234 +) + +func newLocalAPIService(name string) *apiregistration.APIService { + return &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + } +} + +func newRemoteAPIService(name string) *apiregistration.APIService { + return &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: apiregistration.APIServiceSpec{ + Group: strings.SplitN(name, ".", 2)[0], + Version: strings.SplitN(name, ".", 2)[1], + Service: &apiregistration.ServiceReference{ + Namespace: "foo", + Name: "bar", + Port: ptr.To(testServicePort), + }, + }, + } +} + +func TestSync(t *testing.T) { + tests := []struct { + name string + + apiServiceName string + apiServices []runtime.Object + + expectedAvailability apiregistration.APIServiceCondition + expectedAction bool + }{ + { + name: "local", + apiServiceName: "local.group", + apiServices: []runtime.Object{newLocalAPIService("local.group")}, + expectedAvailability: apiregistration.APIServiceCondition{ + Type: apiregistration.Available, + Status: apiregistration.ConditionTrue, + Reason: "Local", + Message: "Local APIServices are always available", + }, + expectedAction: true, + }, + { + name: "remote", + apiServiceName: "remote.group", + apiServices: []runtime.Object{newRemoteAPIService("remote.group")}, + expectedAction: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset(tc.apiServices...) + apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + for _, obj := range tc.apiServices { + if err := apiServiceIndexer.Add(obj); err != nil { + t.Fatalf("failed to add object to indexer: %v", err) + } + } + + c := AvailableConditionController{ + apiServiceClient: fakeClient.ApiregistrationV1(), + apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer), + metrics: availabilitymetrics.New(), + } + if err := c.sync(tc.apiServiceName); err != nil { + t.Fatalf("unexpect sync error: %v", err) + } + + // ought to have one action writing status + if e, a := tc.expectedAction, len(fakeClient.Actions()) == 1; e != a { + t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions()) + } + if tc.expectedAction { + action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction) + if !ok { + t.Fatalf("%v got %v", tc.name, ok) + } + + if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a { + t.Fatalf("%v expected %v, got %v", tc.name, e, action.GetObject()) + } + condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0] + if e, a := tc.expectedAvailability.Type, condition.Type; e != a { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if e, a := tc.expectedAvailability.Status, condition.Status; e != a { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if e, a := tc.expectedAvailability.Message, condition.Message; !strings.HasPrefix(a, e) { + t.Errorf("%v expected %v, got %#v", tc.name, e, condition) + } + if condition.LastTransitionTime.IsZero() { + t.Error("expected lastTransitionTime to be non-zero") + } + } + }) + } +} + +func TestUpdateAPIServiceStatus(t *testing.T) { + foo := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "foo"}}}} + bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}} + + fakeClient := fake.NewSimpleClientset(foo) + c := AvailableConditionController{ + apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter), + metrics: availabilitymetrics.New(), + } + + if _, err := c.updateAPIServiceStatus(foo, foo); err != nil { + t.Fatalf("unexpected updateAPIServiceStatus error: %v", err) + } + if e, a := 0, len(fakeClient.Actions()); e != a { + t.Error(dump.Pretty(fakeClient.Actions())) + } + + fakeClient.ClearActions() + if _, err := c.updateAPIServiceStatus(foo, bar); err != nil { + t.Fatalf("unexpected updateAPIServiceStatus error: %v", err) + } + if e, a := 1, len(fakeClient.Actions()); e != a { + t.Error(dump.Pretty(fakeClient.Actions())) + } +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go index 146c6a51426..ade744708cd 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller.go @@ -88,8 +88,8 @@ type AvailableConditionController struct { metrics *availabilitymetrics.Metrics } -// NewAvailableConditionController returns a new AvailableConditionController. -func NewAvailableConditionController( +// New returns a new remote APIService AvailableConditionController. +func New( apiServiceInformer informers.APIServiceInformer, serviceInformer v1informers.ServiceInformer, endpointsInformer v1informers.EndpointsInformer, @@ -110,7 +110,7 @@ func NewAvailableConditionController( // service network, it is possible for an external, non-watchable factor to affect availability. This keeps // the maximum disruption time to a minimum, but it does prevent hot loops. workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second), - workqueue.TypedRateLimitingQueueConfig[string]{Name: "AvailableConditionController"}, + workqueue.TypedRateLimitingQueueConfig[string]{Name: "RemoteAvailabilityController"}, ), proxyTransportDial: proxyTransportDial, proxyCurrentCertKeyContent: proxyCurrentCertKeyContent, @@ -159,6 +159,13 @@ func (c *AvailableConditionController) sync(key string) error { return err } + if originalAPIService.Spec.Service == nil { + // handled by the local APIService controller + return nil + } + + apiService := originalAPIService.DeepCopy() + // if a particular transport was specified, use that otherwise build one // construct an http client that will ignore TLS verification (if someone owns the network and messes with your status // that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information @@ -188,21 +195,12 @@ func (c *AvailableConditionController) sync(key string) error { }, } - apiService := originalAPIService.DeepCopy() - availableCondition := apiregistrationv1.APIServiceCondition{ Type: apiregistrationv1.Available, Status: apiregistrationv1.ConditionTrue, LastTransitionTime: metav1.Now(), } - // local API services are always considered available - if apiService.Spec.Service == nil { - apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition()) - _, err := c.updateAPIServiceStatus(originalAPIService, apiService) - return err - } - service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name) if apierrors.IsNotFound(err) { availableCondition.Status = apiregistrationv1.ConditionFalse @@ -410,14 +408,14 @@ func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) defer utilruntime.HandleCrash() defer c.queue.ShutDown() - klog.Info("Starting AvailableConditionController") - defer klog.Info("Shutting down AvailableConditionController") + klog.Info("Starting RemoteAvailability controller") + defer klog.Info("Shutting down RemoteAvailability controller") // This waits not just for the informers to sync, but for our handlers // to be called; since the handlers are three different ways of // enqueueing the same thing, waiting for this permits the queue to // maximally de-duplicate the entries. - if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) { + if !controllers.WaitForCacheSync("RemoteAvailability", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) { return } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go index d08c9ae1784..acfe9ba3952 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/remote/remote_available_controller_test.go @@ -225,6 +225,7 @@ func TestSync(t *testing.T) { expectedAvailability apiregistration.APIServiceCondition expectedSyncError string + expectedSkipped bool }{ { name: "local", @@ -237,6 +238,7 @@ func TestSync(t *testing.T) { Reason: "Local", Message: "Local APIServices are always available", }, + expectedSkipped: true, }, { name: "no service", @@ -420,6 +422,13 @@ func TestSync(t *testing.T) { t.Fatalf("%v unexpected sync error: %v", tc.name, err) } + if tc.expectedSkipped { + if len(fakeClient.Actions()) > 0 { + t.Fatalf("%v expected no actions, got %v", tc.name, fakeClient.Actions()) + } + return + } + // ought to have one action writing status if e, a := 1, len(fakeClient.Actions()); e != a { t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions()) diff --git a/test/e2e/apimachinery/health_handlers.go b/test/e2e/apimachinery/health_handlers.go index 6693afd3e16..ea19569516a 100644 --- a/test/e2e/apimachinery/health_handlers.go +++ b/test/e2e/apimachinery/health_handlers.go @@ -51,7 +51,8 @@ var ( "[+]poststarthook/start-cluster-authentication-info-controller ok", "[+]poststarthook/start-kube-aggregator-informers ok", "[+]poststarthook/apiservice-registration-controller ok", - "[+]poststarthook/apiservice-status-available-controller ok", + "[+]poststarthook/apiservice-status-local-available-controller ok", + "[+]poststarthook/apiservice-status-remote-available-controller ok", "[+]poststarthook/kube-apiserver-autoregistration ok", "[+]autoregister-completion ok", "[+]poststarthook/apiservice-openapi-controller ok", @@ -72,7 +73,8 @@ var ( "[+]poststarthook/start-cluster-authentication-info-controller ok", "[+]poststarthook/start-kube-aggregator-informers ok", "[+]poststarthook/apiservice-registration-controller ok", - "[+]poststarthook/apiservice-status-available-controller ok", + "[+]poststarthook/apiservice-status-local-available-controller ok", + "[+]poststarthook/apiservice-status-remote-available-controller ok", "[+]poststarthook/kube-apiserver-autoregistration ok", "[+]autoregister-completion ok", "[+]poststarthook/apiservice-openapi-controller ok", @@ -94,7 +96,8 @@ var ( "[+]poststarthook/start-cluster-authentication-info-controller ok", "[+]poststarthook/start-kube-aggregator-informers ok", "[+]poststarthook/apiservice-registration-controller ok", - "[+]poststarthook/apiservice-status-available-controller ok", + "[+]poststarthook/apiservice-status-local-available-controller ok", + "[+]poststarthook/apiservice-status-remote-available-controller ok", "[+]poststarthook/kube-apiserver-autoregistration ok", "[+]autoregister-completion ok", "[+]poststarthook/apiservice-openapi-controller ok", From b27142852f664978e83d5eff390f4c047c227608 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Sun, 21 Jul 2024 17:41:50 +0200 Subject: [PATCH 5/5] test/integration: adapt numbers in TestAPIServerTransportMetrics with less rest client creations Signed-off-by: Dr. Stefan Schimanski --- test/integration/client/metrics/metrics_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/integration/client/metrics/metrics_test.go b/test/integration/client/metrics/metrics_test.go index 5f0779823b4..af88050deba 100644 --- a/test/integration/client/metrics/metrics_test.go +++ b/test/integration/client/metrics/metrics_test.go @@ -57,11 +57,11 @@ func TestAPIServerTransportMetrics(t *testing.T) { // IMPORTANT: reflect the current values if the test changes // client_test.go:1407: metric rest_client_transport_cache_entries 3 - // client_test.go:1407: metric rest_client_transport_create_calls_total{result="hit"} 61 + // client_test.go:1407: metric rest_client_transport_create_calls_total{result="hit"} 20 // client_test.go:1407: metric rest_client_transport_create_calls_total{result="miss"} 3 hits1, misses1, entries1 := checkTransportMetrics(t, client) // hit ratio at startup depends on multiple factors - if (hits1*100)/(hits1+misses1) < 90 { + if (hits1*100)/(hits1+misses1) < 85 { t.Fatalf("transport cache hit ratio %d lower than 90 percent", (hits1*100)/(hits1+misses1)) } @@ -114,7 +114,7 @@ func TestAPIServerTransportMetrics(t *testing.T) { } // hit ratio after startup should grow since no new transports are expected - if (hits2*100)/(hits2+misses2) < 95 { + if (hits2*100)/(hits2+misses2) < 85 { t.Fatalf("transport cache hit ratio %d lower than 95 percent", (hits2*100)/(hits2+misses2)) } }