Merge pull request #126149 from sttts/sttts-aggregator-availability-controller-split

Step 11 - Split aggregator availability controller into local and remote part
This commit is contained in:
Kubernetes Prow Robot 2024-07-21 09:54:46 -07:00 committed by GitHub
commit 69eee1c4a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 575 additions and 125 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -37,6 +38,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/transport"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/tracing"
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
@ -49,11 +51,16 @@ import (
openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3"
openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
localavailability "k8s.io/kube-aggregator/pkg/controllers/status/local"
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
remoteavailability "k8s.io/kube-aggregator/pkg/controllers/status/remote"
apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
openapicommon "k8s.io/kube-openapi/pkg/common"
)
// making sure we only register metrics once into legacy registry
var registerIntoLegacyRegistryOnce sync.Once
func init() {
// we need to add the options (like ListOptions) to empty v1
metav1.AddToGroupVersion(aggregatorscheme.Scheme, schema.GroupVersion{Group: "", Version: "v1"})
@ -96,12 +103,11 @@ type ExtraConfig struct {
RejectForwardingRedirects bool
// DisableAvailableConditionController disables the controller that updates the Available conditions for
// APIServices, Endpoints and Services. This controller runs in kube-aggregator and can interfere with
// Generic Control Plane components when certain apis are not available.
// TODO: We should find a better way to handle this. For now it will be for Generic Control Plane authors to
// disable this controller if they see issues.
DisableAvailableConditionController bool
// DisableRemoteAvailableConditionController disables the controller that updates the Available conditions for
// remote APIServices via querying endpoints of the referenced services. In generic controlplane use-cases,
// the concept of services and endpoints might differ, and might require another implementation of this
// controller. Local APIService are reconciled nevertheless.
DisableRemoteAvailableConditionController bool
}
// Config represents the configuration needed to create an APIAggregator.
@ -314,10 +320,39 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
})
}
// If the AvailableConditionController is disabled, we don't need to start the informers
// and the controller.
if !c.ExtraConfig.DisableAvailableConditionController {
availableController, err := statuscontrollers.NewAvailableConditionController(
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.Done())
c.GenericConfig.SharedInformerFactory.Start(context.Done())
return nil
})
// create shared (remote and local) availability metrics
// TODO: decouple from legacyregistry
metrics := availabilitymetrics.New()
registerIntoLegacyRegistryOnce.Do(func() { err = metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister) })
if err != nil {
return nil, err
}
// always run local availability controller
local, err := localavailability.New(
informerFactory.Apiregistration().V1().APIServices(),
apiregistrationClient.ApiregistrationV1(),
metrics,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-local-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase workers.
go local.Run(5, context.Done())
return nil
})
// conditionally run remote availability controller. This could be replaced in certain
// generic controlplane use-cases where there is another concept of services and/or endpoints.
if !c.ExtraConfig.DisableRemoteAvailableConditionController {
remote, err := remoteavailability.New(
informerFactory.Apiregistration().V1().APIServices(),
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
@ -325,20 +360,14 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
proxyTransportDial,
(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
s.serviceResolver,
metrics,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.Done())
c.GenericConfig.SharedInformerFactory.Start(context.Done())
return nil
})
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-remote-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase workers.
go availableController.Run(5, context.Done())
go remote.Run(5, context.Done())
return nil
})
}

View File

@ -0,0 +1,227 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package external
import (
"context"
"fmt"
"time"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers"
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
)
// AvailableConditionController handles checking the availability of registered local API services.
type AvailableConditionController struct {
apiServiceClient apiregistrationclient.APIServicesGetter
apiServiceLister listers.APIServiceLister
apiServiceSynced cache.InformerSynced
// To allow injection for testing.
syncFn func(key string) error
queue workqueue.TypedRateLimitingInterface[string]
// metrics registered into legacy registry
metrics *availabilitymetrics.Metrics
}
// New returns a new local availability AvailableConditionController.
func New(
apiServiceInformer informers.APIServiceInformer,
apiServiceClient apiregistrationclient.APIServicesGetter,
metrics *availabilitymetrics.Metrics,
) (*AvailableConditionController, error) {
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
apiServiceLister: apiServiceInformer.Lister(),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
// We want a fairly tight requeue time. The controller listens to the API, but because it relies on the routability of the
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "LocalAvailabilityController"},
),
metrics: metrics,
}
// resync on this one because it is low cardinality and rechecking the actual discovery
// allows us to detect health in a more timely fashion when network connectivity to
// nodes is snipped, but the network still attempts to route there. See
// https://github.com/openshift/origin/issues/17159#issuecomment-341798063
apiServiceHandler, _ := apiServiceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addAPIService,
UpdateFunc: c.updateAPIService,
DeleteFunc: c.deleteAPIService,
},
30*time.Second)
c.apiServiceSynced = apiServiceHandler.HasSynced
c.syncFn = c.sync
return c, nil
}
func (c *AvailableConditionController) sync(key string) error {
originalAPIService, err := c.apiServiceLister.Get(key)
if apierrors.IsNotFound(err) {
c.metrics.ForgetAPIService(key)
return nil
}
if err != nil {
return err
}
if originalAPIService.Spec.Service != nil {
// this controller only handles local APIServices
return nil
}
// local API services are always considered available
apiService := originalAPIService.DeepCopy()
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
_, err = c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
// updateAPIServiceStatus only issues an update if a change is detected. We have a tight resync loop to quickly detect dead
// apiservices. Doing that means we don't want to quickly issue no-op updates.
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
// update this metric on every sync operation to reflect the actual state
c.metrics.SetUnavailableGauge(newAPIService)
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
return newAPIService, nil
}
orig := apiregistrationv1apihelper.GetAPIServiceConditionByType(originalAPIService, apiregistrationv1.Available)
now := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available)
unknown := apiregistrationv1.APIServiceCondition{
Type: apiregistrationv1.Available,
Status: apiregistrationv1.ConditionUnknown,
}
if orig == nil {
orig = &unknown
}
if now == nil {
now = &unknown
}
if *orig != *now {
klog.V(2).InfoS("changing APIService availability", "name", newAPIService.Name, "oldStatus", orig.Status, "newStatus", now.Status, "message", now.Message, "reason", now.Reason)
}
newAPIService, err := c.apiServiceClient.APIServices().UpdateStatus(context.TODO(), newAPIService, metav1.UpdateOptions{})
if err != nil {
return nil, err
}
c.metrics.SetUnavailableCounter(originalAPIService, newAPIService)
return newAPIService, nil
}
// Run starts the AvailableConditionController loop which manages the availability condition of API services.
func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting LocalAvailability controller")
defer klog.Info("Shutting down LocalAvailability controller")
// This waits not just for the informers to sync, but for our handlers
// to be called; since the handlers are three different ways of
// enqueueing the same thing, waiting for this permits the queue to
// maximally de-duplicate the entries.
if !controllers.WaitForCacheSync("LocalAvailability", stopCh, c.apiServiceSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (c *AvailableConditionController) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (c *AvailableConditionController) processNextWorkItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncFn(key)
if err == nil {
c.queue.Forget(key)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with: %w", key, err))
c.queue.AddRateLimited(key)
return true
}
func (c *AvailableConditionController) addAPIService(obj interface{}) {
castObj := obj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Adding %s", castObj.Name)
c.queue.Add(castObj.Name)
}
func (c *AvailableConditionController) updateAPIService(oldObj, _ interface{}) {
oldCastObj := oldObj.(*apiregistrationv1.APIService)
klog.V(4).Infof("Updating %s", oldCastObj.Name)
c.queue.Add(oldCastObj.Name)
}
func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
castObj, ok := obj.(*apiregistrationv1.APIService)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
klog.Errorf("Couldn't get object from tombstone %#v", obj)
return
}
castObj, ok = tombstone.Obj.(*apiregistrationv1.APIService)
if !ok {
klog.Errorf("Tombstone contained object that is not expected %#v", obj)
return
}
}
klog.V(4).Infof("Deleting %q", castObj.Name)
c.queue.Add(castObj.Name)
}

View File

@ -0,0 +1,168 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package external
import (
"strings"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/dump"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
"k8s.io/utils/ptr"
)
const (
testServicePort int32 = 1234
)
func newLocalAPIService(name string) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: name},
}
}
func newRemoteAPIService(name string) *apiregistration.APIService {
return &apiregistration.APIService{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: apiregistration.APIServiceSpec{
Group: strings.SplitN(name, ".", 2)[0],
Version: strings.SplitN(name, ".", 2)[1],
Service: &apiregistration.ServiceReference{
Namespace: "foo",
Name: "bar",
Port: ptr.To(testServicePort),
},
},
}
}
func TestSync(t *testing.T) {
tests := []struct {
name string
apiServiceName string
apiServices []runtime.Object
expectedAvailability apiregistration.APIServiceCondition
expectedAction bool
}{
{
name: "local",
apiServiceName: "local.group",
apiServices: []runtime.Object{newLocalAPIService("local.group")},
expectedAvailability: apiregistration.APIServiceCondition{
Type: apiregistration.Available,
Status: apiregistration.ConditionTrue,
Reason: "Local",
Message: "Local APIServices are always available",
},
expectedAction: true,
},
{
name: "remote",
apiServiceName: "remote.group",
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
expectedAction: false,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset(tc.apiServices...)
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
for _, obj := range tc.apiServices {
if err := apiServiceIndexer.Add(obj); err != nil {
t.Fatalf("failed to add object to indexer: %v", err)
}
}
c := AvailableConditionController{
apiServiceClient: fakeClient.ApiregistrationV1(),
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
metrics: availabilitymetrics.New(),
}
if err := c.sync(tc.apiServiceName); err != nil {
t.Fatalf("unexpect sync error: %v", err)
}
// ought to have one action writing status
if e, a := tc.expectedAction, len(fakeClient.Actions()) == 1; e != a {
t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
}
if tc.expectedAction {
action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction)
if !ok {
t.Fatalf("%v got %v", tc.name, ok)
}
if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a {
t.Fatalf("%v expected %v, got %v", tc.name, e, action.GetObject())
}
condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0]
if e, a := tc.expectedAvailability.Type, condition.Type; e != a {
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
}
if e, a := tc.expectedAvailability.Status, condition.Status; e != a {
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
}
if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a {
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
}
if e, a := tc.expectedAvailability.Message, condition.Message; !strings.HasPrefix(a, e) {
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
}
if condition.LastTransitionTime.IsZero() {
t.Error("expected lastTransitionTime to be non-zero")
}
}
})
}
}
func TestUpdateAPIServiceStatus(t *testing.T) {
foo := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "foo"}}}}
bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}}
fakeClient := fake.NewSimpleClientset(foo)
c := AvailableConditionController{
apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter),
metrics: availabilitymetrics.New(),
}
if _, err := c.updateAPIServiceStatus(foo, foo); err != nil {
t.Fatalf("unexpected updateAPIServiceStatus error: %v", err)
}
if e, a := 0, len(fakeClient.Actions()); e != a {
t.Error(dump.Pretty(fakeClient.Actions()))
}
fakeClient.ClearActions()
if _, err := c.updateAPIServiceStatus(foo, bar); err != nil {
t.Fatalf("unexpected updateAPIServiceStatus error: %v", err)
}
if e, a := 1, len(fakeClient.Actions()); e != a {
t.Error(dump.Pretty(fakeClient.Actions()))
}
}

View File

@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package metrics
import (
"sync"
"k8s.io/component-base/metrics"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
)
/*
@ -41,14 +43,14 @@ var (
)
)
type availabilityMetrics struct {
type Metrics struct {
unavailableCounter *metrics.CounterVec
*availabilityCollector
}
func newAvailabilityMetrics() *availabilityMetrics {
return &availabilityMetrics{
func New() *Metrics {
return &Metrics{
unavailableCounter: metrics.NewCounterVec(
&metrics.CounterOpts{
Name: "aggregator_unavailable_apiservice_total",
@ -62,7 +64,7 @@ func newAvailabilityMetrics() *availabilityMetrics {
}
// Register registers apiservice availability metrics.
func (m *availabilityMetrics) Register(
func (m *Metrics) Register(
registrationFunc func(metrics.Registerable) error,
customRegistrationFunc func(metrics.StableCollector) error,
) error {
@ -80,7 +82,7 @@ func (m *availabilityMetrics) Register(
}
// UnavailableCounter returns a counter to track apiservices marked as unavailable.
func (m *availabilityMetrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric {
func (m *Metrics) UnavailableCounter(apiServiceName, reason string) metrics.CounterMetric {
return m.unavailableCounter.WithLabelValues(apiServiceName, reason)
}
@ -91,6 +93,31 @@ type availabilityCollector struct {
availabilities map[string]bool
}
// SetUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service
func (m *Metrics) SetUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) {
m.SetAPIServiceAvailable(newAPIService.Name)
return
}
m.SetAPIServiceUnavailable(newAPIService.Name)
}
// SetUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed
func (m *Metrics) SetUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available)
isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available)
statusChanged := isAvailable != wasAvailable
if statusChanged && !isAvailable {
reason := "UnknownReason"
if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil {
reason = newCondition.Reason
}
m.UnavailableCounter(newAPIService.Name, reason).Inc()
}
}
// Check if apiServiceStatusCollector implements necessary interface.
var _ metrics.StableCollector = &availabilityCollector{}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package metrics
import (
"strings"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package remote
import (
"context"
@ -39,7 +39,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
@ -47,11 +46,9 @@ import (
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/controllers"
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
)
// making sure we only register metrics once into legacy registry
var registerIntoLegacyRegistryOnce sync.Once
type certKeyFunc func() ([]byte, []byte)
// ServiceResolver knows how to convert a service reference into an actual location.
@ -88,11 +85,11 @@ type AvailableConditionController struct {
cacheLock sync.RWMutex
// metrics registered into legacy registry
metrics *availabilityMetrics
metrics *availabilitymetrics.Metrics
}
// NewAvailableConditionController returns a new AvailableConditionController.
func NewAvailableConditionController(
// New returns a new remote APIService AvailableConditionController.
func New(
apiServiceInformer informers.APIServiceInformer,
serviceInformer v1informers.ServiceInformer,
endpointsInformer v1informers.EndpointsInformer,
@ -100,6 +97,7 @@ func NewAvailableConditionController(
proxyTransportDial *transport.DialHolder,
proxyCurrentCertKeyContent certKeyFunc,
serviceResolver ServiceResolver,
metrics *availabilitymetrics.Metrics,
) (*AvailableConditionController, error) {
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
@ -112,11 +110,11 @@ func NewAvailableConditionController(
// service network, it is possible for an external, non-watchable factor to affect availability. This keeps
// the maximum disruption time to a minimum, but it does prevent hot loops.
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "AvailableConditionController"},
workqueue.TypedRateLimitingQueueConfig[string]{Name: "RemoteAvailabilityController"},
),
proxyTransportDial: proxyTransportDial,
proxyCurrentCertKeyContent: proxyCurrentCertKeyContent,
metrics: newAvailabilityMetrics(),
metrics: metrics,
}
// resync on this one because it is low cardinality and rechecking the actual discovery
@ -148,15 +146,6 @@ func NewAvailableConditionController(
c.syncFn = c.sync
// TODO: decouple from legacyregistry
var err error
registerIntoLegacyRegistryOnce.Do(func() {
err = c.metrics.Register(legacyregistry.Register, legacyregistry.CustomRegister)
})
if err != nil {
return nil, err
}
return c, nil
}
@ -170,6 +159,13 @@ func (c *AvailableConditionController) sync(key string) error {
return err
}
if originalAPIService.Spec.Service == nil {
// handled by the local APIService controller
return nil
}
apiService := originalAPIService.DeepCopy()
// if a particular transport was specified, use that otherwise build one
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
@ -199,21 +195,12 @@ func (c *AvailableConditionController) sync(key string) error {
},
}
apiService := originalAPIService.DeepCopy()
availableCondition := apiregistrationv1.APIServiceCondition{
Type: apiregistrationv1.Available,
Status: apiregistrationv1.ConditionTrue,
LastTransitionTime: metav1.Now(),
}
// local API services are always considered available
if apiService.Spec.Service == nil {
apiregistrationv1apihelper.SetAPIServiceCondition(apiService, apiregistrationv1apihelper.NewLocalAvailableAPIServiceCondition())
_, err := c.updateAPIServiceStatus(originalAPIService, apiService)
return err
}
service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
if apierrors.IsNotFound(err) {
availableCondition.Status = apiregistrationv1.ConditionFalse
@ -324,7 +311,7 @@ func (c *AvailableConditionController) sync(key string) error {
resp.Body.Close()
// we should always been in the 200s or 300s
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode)
errCh <- fmt.Errorf("bad status from %v: %d", discoveryURL, resp.StatusCode)
return
}
}
@ -335,7 +322,7 @@ func (c *AvailableConditionController) sync(key string) error {
select {
case err = <-errCh:
if err != nil {
results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err)
results <- fmt.Errorf("failing or missing response from %v: %w", discoveryURL, err)
return
}
@ -385,7 +372,7 @@ func (c *AvailableConditionController) sync(key string) error {
// apiservices. Doing that means we don't want to quickly issue no-op updates.
func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService, newAPIService *apiregistrationv1.APIService) (*apiregistrationv1.APIService, error) {
// update this metric on every sync operation to reflect the actual state
c.setUnavailableGauge(newAPIService)
c.metrics.SetUnavailableGauge(newAPIService)
if equality.Semantic.DeepEqual(originalAPIService.Status, newAPIService.Status) {
return newAPIService, nil
@ -412,7 +399,7 @@ func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService
return nil, err
}
c.setUnavailableCounter(originalAPIService, newAPIService)
c.metrics.SetUnavailableCounter(originalAPIService, newAPIService)
return newAPIService, nil
}
@ -421,14 +408,14 @@ func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{})
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting AvailableConditionController")
defer klog.Info("Shutting down AvailableConditionController")
klog.Info("Starting RemoteAvailability controller")
defer klog.Info("Shutting down RemoteAvailability controller")
// This waits not just for the informers to sync, but for our handlers
// to be called; since the handlers are three different ways of
// enqueueing the same thing, waiting for this permits the queue to
// maximally de-duplicate the entries.
if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
if !controllers.WaitForCacheSync("RemoteAvailability", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
return
}
@ -599,28 +586,3 @@ func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
c.queue.Add(apiService)
}
}
// setUnavailableGauge set the metrics so that it reflect the current state base on availability of the given service
func (c *AvailableConditionController) setUnavailableGauge(newAPIService *apiregistrationv1.APIService) {
if apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available) {
c.metrics.SetAPIServiceAvailable(newAPIService.Name)
return
}
c.metrics.SetAPIServiceUnavailable(newAPIService.Name)
}
// setUnavailableCounter increases the metrics only if the given service is unavailable and its APIServiceCondition has changed
func (c *AvailableConditionController) setUnavailableCounter(originalAPIService, newAPIService *apiregistrationv1.APIService) {
wasAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(originalAPIService, apiregistrationv1.Available)
isAvailable := apiregistrationv1apihelper.IsAPIServiceConditionTrue(newAPIService, apiregistrationv1.Available)
statusChanged := isAvailable != wasAvailable
if statusChanged && !isAvailable {
reason := "UnknownReason"
if newCondition := apiregistrationv1apihelper.GetAPIServiceConditionByType(newAPIService, apiregistrationv1.Available); newCondition != nil {
reason = newCondition.Reason
}
c.metrics.UnavailableCounter(newAPIService.Name, reason).Inc()
}
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver
package remote
import (
"fmt"
@ -25,10 +25,9 @@ import (
"testing"
"time"
"k8s.io/utils/pointer"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/dump"
v1listers "k8s.io/client-go/listers/core/v1"
clienttesting "k8s.io/client-go/testing"
@ -38,11 +37,13 @@ import (
"k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/fake"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
availabilitymetrics "k8s.io/kube-aggregator/pkg/controllers/status/metrics"
"k8s.io/utils/ptr"
)
const (
testServicePort = 1234
testServicePortName = "testPort"
testServicePort int32 = 1234
testServicePortName = "testPort"
)
func newEndpoints(namespace, name string) *v1.Endpoints {
@ -99,13 +100,18 @@ func newRemoteAPIService(name string) *apiregistration.APIService {
Service: &apiregistration.ServiceReference{
Namespace: "foo",
Name: "bar",
Port: pointer.Int32Ptr(testServicePort),
Port: ptr.To(testServicePort),
},
},
}
}
func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableConditionController, *fake.Clientset) {
type T interface {
Fatalf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
func setupAPIServices(t T, apiServices []runtime.Object) (*AvailableConditionController, *fake.Clientset) {
fakeClient := fake.NewSimpleClientset()
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
@ -117,7 +123,9 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
defer testServer.Close()
for _, o := range apiServices {
apiServiceIndexer.Add(o)
if err := apiServiceIndexer.Add(o); err != nil {
t.Fatalf("failed to add APIService: %v", err)
}
}
c := AvailableConditionController{
@ -133,7 +141,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 30*time.Second),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "AvailableConditionController"},
),
metrics: newAvailabilityMetrics(),
metrics: availabilitymetrics.New(),
}
for _, svc := range apiServices {
c.addAPIService(svc)
@ -144,7 +152,7 @@ func setupAPIServices(apiServices []*apiregistration.APIService) (*AvailableCond
func BenchmarkBuildCache(b *testing.B) {
apiServiceName := "remote.group"
// model 1 APIService pointing at a given service, and 30 pointing at local group/versions
apiServices := []*apiregistration.APIService{newRemoteAPIService(apiServiceName)}
apiServices := []runtime.Object{newRemoteAPIService(apiServiceName)}
for i := 0; i < 30; i++ {
apiServices = append(apiServices, newLocalAPIService(fmt.Sprintf("local.group%d", i)))
}
@ -153,7 +161,7 @@ func BenchmarkBuildCache(b *testing.B) {
for i := 0; i < 100; i++ {
services = append(services, newService("foo", fmt.Sprintf("bar%d", i), testServicePort, testServicePortName))
}
c, _ := setupAPIServices(apiServices)
c, _ := setupAPIServices(b, apiServices)
b.ReportAllocs()
b.ResetTimer()
for n := 1; n <= b.N; n++ {
@ -174,7 +182,7 @@ func TestBuildCache(t *testing.T) {
name string
apiServiceName string
apiServices []*apiregistration.APIService
apiServices []runtime.Object
services []*v1.Service
endpoints []*v1.Endpoints
@ -183,13 +191,13 @@ func TestBuildCache(t *testing.T) {
{
name: "api service",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
c, fakeClient := setupAPIServices(tc.apiServices)
c, fakeClient := setupAPIServices(t, tc.apiServices)
for _, svc := range tc.services {
c.addService(svc)
}
@ -209,18 +217,20 @@ func TestSync(t *testing.T) {
name string
apiServiceName string
apiServices []*apiregistration.APIService
apiServices []runtime.Object
services []*v1.Service
endpoints []*v1.Endpoints
backendStatus int
backendLocation string
expectedAvailability apiregistration.APIServiceCondition
expectedSyncError string
expectedSkipped bool
}{
{
name: "local",
apiServiceName: "local.group",
apiServices: []*apiregistration.APIService{newLocalAPIService("local.group")},
apiServices: []runtime.Object{newLocalAPIService("local.group")},
backendStatus: http.StatusOK,
expectedAvailability: apiregistration.APIServiceCondition{
Type: apiregistration.Available,
@ -228,11 +238,12 @@ func TestSync(t *testing.T) {
Reason: "Local",
Message: "Local APIServices are always available",
},
expectedSkipped: true,
},
{
name: "no service",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "not-bar", testServicePort, testServicePortName)},
backendStatus: http.StatusOK,
expectedAvailability: apiregistration.APIServiceCondition{
@ -245,7 +256,7 @@ func TestSync(t *testing.T) {
{
name: "service on bad port",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{{
ObjectMeta: metav1.ObjectMeta{Namespace: "foo", Name: "bar"},
Spec: v1.ServiceSpec{
@ -267,7 +278,7 @@ func TestSync(t *testing.T) {
{
name: "no endpoints",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
backendStatus: http.StatusOK,
expectedAvailability: apiregistration.APIServiceCondition{
@ -280,7 +291,7 @@ func TestSync(t *testing.T) {
{
name: "missing endpoints",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
endpoints: []*v1.Endpoints{newEndpoints("foo", "bar")},
backendStatus: http.StatusOK,
@ -294,7 +305,7 @@ func TestSync(t *testing.T) {
{
name: "wrong endpoint port name",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, "wrongName")},
backendStatus: http.StatusOK,
@ -308,7 +319,7 @@ func TestSync(t *testing.T) {
{
name: "remote",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)},
backendStatus: http.StatusOK,
@ -322,7 +333,7 @@ func TestSync(t *testing.T) {
{
name: "remote-bad-return",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)},
backendStatus: http.StatusForbidden,
@ -332,11 +343,12 @@ func TestSync(t *testing.T) {
Reason: "FailedDiscoveryCheck",
Message: `failing or missing response from`,
},
expectedSyncError: "failing or missing response from",
},
{
name: "remote-redirect",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)},
backendStatus: http.StatusFound,
@ -347,11 +359,12 @@ func TestSync(t *testing.T) {
Reason: "FailedDiscoveryCheck",
Message: `failing or missing response from`,
},
expectedSyncError: "failing or missing response from",
},
{
name: "remote-304",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
apiServices: []runtime.Object{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)},
backendStatus: http.StatusNotModified,
@ -361,12 +374,13 @@ func TestSync(t *testing.T) {
Reason: "FailedDiscoveryCheck",
Message: `failing or missing response from`,
},
expectedSyncError: "failing or missing response from",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
fakeClient := fake.NewSimpleClientset(tc.apiServices...)
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
@ -395,9 +409,25 @@ func TestSync(t *testing.T) {
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
serviceResolver: &fakeServiceResolver{url: testServer.URL},
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
metrics: newAvailabilityMetrics(),
metrics: availabilitymetrics.New(),
}
err := c.sync(tc.apiServiceName)
if tc.expectedSyncError != "" {
if err == nil {
t.Fatalf("%v expected error with %q, got none", tc.name, tc.expectedSyncError)
} else if !strings.Contains(err.Error(), tc.expectedSyncError) {
t.Fatalf("%v expected error with %q, got %q", tc.name, tc.expectedSyncError, err.Error())
}
} else if err != nil {
t.Fatalf("%v unexpected sync error: %v", tc.name, err)
}
if tc.expectedSkipped {
if len(fakeClient.Actions()) > 0 {
t.Fatalf("%v expected no actions, got %v", tc.name, fakeClient.Actions())
}
return
}
c.sync(tc.apiServiceName)
// ought to have one action writing status
if e, a := 1, len(fakeClient.Actions()); e != a {
@ -444,19 +474,23 @@ func TestUpdateAPIServiceStatus(t *testing.T) {
foo := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "foo"}}}}
bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}}
fakeClient := fake.NewSimpleClientset()
fakeClient := fake.NewSimpleClientset(foo)
c := AvailableConditionController{
apiServiceClient: fakeClient.ApiregistrationV1().(apiregistrationclient.APIServicesGetter),
metrics: newAvailabilityMetrics(),
metrics: availabilitymetrics.New(),
}
c.updateAPIServiceStatus(foo, foo)
if _, err := c.updateAPIServiceStatus(foo, foo); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if e, a := 0, len(fakeClient.Actions()); e != a {
t.Error(dump.Pretty(fakeClient.Actions()))
}
fakeClient.ClearActions()
c.updateAPIServiceStatus(foo, bar)
if _, err := c.updateAPIServiceStatus(foo, bar); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if e, a := 1, len(fakeClient.Actions()); e != a {
t.Error(dump.Pretty(fakeClient.Actions()))
}

View File

@ -51,7 +51,8 @@ var (
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
"[+]poststarthook/apiservice-registration-controller ok",
"[+]poststarthook/apiservice-status-available-controller ok",
"[+]poststarthook/apiservice-status-local-available-controller ok",
"[+]poststarthook/apiservice-status-remote-available-controller ok",
"[+]poststarthook/kube-apiserver-autoregistration ok",
"[+]autoregister-completion ok",
"[+]poststarthook/apiservice-openapi-controller ok",
@ -72,7 +73,8 @@ var (
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
"[+]poststarthook/apiservice-registration-controller ok",
"[+]poststarthook/apiservice-status-available-controller ok",
"[+]poststarthook/apiservice-status-local-available-controller ok",
"[+]poststarthook/apiservice-status-remote-available-controller ok",
"[+]poststarthook/kube-apiserver-autoregistration ok",
"[+]autoregister-completion ok",
"[+]poststarthook/apiservice-openapi-controller ok",
@ -94,7 +96,8 @@ var (
"[+]poststarthook/start-cluster-authentication-info-controller ok",
"[+]poststarthook/start-kube-aggregator-informers ok",
"[+]poststarthook/apiservice-registration-controller ok",
"[+]poststarthook/apiservice-status-available-controller ok",
"[+]poststarthook/apiservice-status-local-available-controller ok",
"[+]poststarthook/apiservice-status-remote-available-controller ok",
"[+]poststarthook/kube-apiserver-autoregistration ok",
"[+]autoregister-completion ok",
"[+]poststarthook/apiservice-openapi-controller ok",

View File

@ -57,11 +57,11 @@ func TestAPIServerTransportMetrics(t *testing.T) {
// IMPORTANT: reflect the current values if the test changes
// client_test.go:1407: metric rest_client_transport_cache_entries 3
// client_test.go:1407: metric rest_client_transport_create_calls_total{result="hit"} 61
// client_test.go:1407: metric rest_client_transport_create_calls_total{result="hit"} 20
// client_test.go:1407: metric rest_client_transport_create_calls_total{result="miss"} 3
hits1, misses1, entries1 := checkTransportMetrics(t, client)
// hit ratio at startup depends on multiple factors
if (hits1*100)/(hits1+misses1) < 90 {
if (hits1*100)/(hits1+misses1) < 85 {
t.Fatalf("transport cache hit ratio %d lower than 90 percent", (hits1*100)/(hits1+misses1))
}
@ -114,7 +114,7 @@ func TestAPIServerTransportMetrics(t *testing.T) {
}
// hit ratio after startup should grow since no new transports are expected
if (hits2*100)/(hits2+misses2) < 95 {
if (hits2*100)/(hits2+misses2) < 85 {
t.Fatalf("transport cache hit ratio %d lower than 95 percent", (hits2*100)/(hits2+misses2))
}
}