Merge pull request #115630 from Jefftree/agg-discovery-metrics

Add metrics for aggregated discovery
This commit is contained in:
Kubernetes Prow Robot 2023-03-10 07:44:41 -08:00 committed by GitHub
commit 2e3c5003b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 182 additions and 36 deletions

View File

@ -473,7 +473,7 @@ func buildGenericConfig(
genericConfig.FlowControl, lastErr = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
}
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager()
genericConfig.AggregatedDiscoveryGroupManager = aggregated.NewResourceManager("apis")
}
return

View File

@ -54,7 +54,7 @@ func ServeHTTPWithETag(
// Otherwise, we delegate to the handler for actual content
//
// According to documentation, An Etag within an If-None-Match
// header will be enclosed within doule quotes:
// header will be enclosed within double quotes:
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/If-None-Match#directives
if clientCachedHash := req.Header.Get("If-None-Match"); quotedHash == clientCachedHash {
w.WriteHeader(http.StatusNotModified)

View File

@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
"sync/atomic"
@ -72,6 +73,8 @@ type resourceDiscoveryManager struct {
// cache is an atomic pointer to avoid the use of locks
cache atomic.Pointer[cachedGroupList]
serveHTTPFunc http.HandlerFunc
// Writes protected by the lock.
// List of all apigroups & resources indexed by the resource manager
lock sync.RWMutex
@ -84,13 +87,26 @@ type priorityInfo struct {
VersionPriority int
}
func NewResourceManager() ResourceManager {
func NewResourceManager(path string) ResourceManager {
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
utilruntime.Must(apidiscoveryv2beta1.AddToScheme(scheme))
return &resourceDiscoveryManager{serializer: codecs, versionPriorities: make(map[metav1.GroupVersion]priorityInfo)}
rdm := &resourceDiscoveryManager{
serializer: codecs,
versionPriorities: make(map[metav1.GroupVersion]priorityInfo),
}
rdm.serveHTTPFunc = metrics.InstrumentHandlerFunc("GET",
/* group = */ "",
/* version = */ "",
/* resource = */ "",
/* subresource = */ path,
/* scope = */ "",
/* component = */ metrics.APIServerComponent,
/* deprecated */ false,
/* removedRelease */ "",
rdm.serveHTTP)
return rdm
}
func (rdm *resourceDiscoveryManager) SetGroupVersionPriority(gv metav1.GroupVersion, groupPriorityMinimum, versionPriority int) {
rdm.lock.Lock()
defer rdm.lock.Unlock()
@ -246,6 +262,7 @@ func (rdm *resourceDiscoveryManager) RemoveGroup(groupName string) {
// Prepares the api group list for serving by converting them from map into
// list and sorting them according to insertion order
func (rdm *resourceDiscoveryManager) calculateAPIGroupsLocked() []apidiscoveryv2beta1.APIGroupDiscovery {
regenerationCounter.Inc()
// Re-order the apiGroups by their priority.
groups := []apidiscoveryv2beta1.APIGroupDiscovery{}
for _, group := range rdm.apiGroups {
@ -338,6 +355,10 @@ type cachedGroupList struct {
}
func (rdm *resourceDiscoveryManager) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
rdm.serveHTTPFunc(resp, req)
}
func (rdm *resourceDiscoveryManager) serveHTTP(resp http.ResponseWriter, req *http.Request) {
cache := rdm.fetchFromCache()
response := cache.cachedResponse
etag := cache.cachedResponseETag

View File

@ -120,7 +120,7 @@ func fetchPath(handler http.Handler, acceptPrefix string, path string, etag stri
// Add all builtin APIServices to the manager and check the output
func TestBasicResponse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
@ -141,7 +141,7 @@ func TestBasicResponse(t *testing.T) {
// Test that protobuf is outputted correctly
func TestBasicResponseProtobuf(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
@ -157,8 +157,8 @@ func TestBasicResponseProtobuf(t *testing.T) {
// e.g.: Multiple services with the same contents should have the same etag.
func TestEtagConsistent(t *testing.T) {
// Create 2 managers, add a bunch of services to each
manager1 := discoveryendpoint.NewResourceManager()
manager2 := discoveryendpoint.NewResourceManager()
manager1 := discoveryendpoint.NewResourceManager("apis")
manager2 := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 11)
manager1.SetGroups(apis.Items)
@ -231,7 +231,7 @@ func TestEtagConsistent(t *testing.T) {
// Test that if a request comes in with an If-None-Match header with an incorrect
// E-Tag, that fresh content is returned.
func TestEtagNonMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
@ -251,7 +251,7 @@ func TestEtagNonMatching(t *testing.T) {
// Test that if a request comes in with an If-None-Match header with a correct
// E-Tag, that 304 Not Modified is returned
func TestEtagMatching(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 12)
manager.SetGroups(apis.Items)
@ -273,7 +273,7 @@ func TestEtagMatching(t *testing.T) {
// Test that if a request comes in with an If-None-Match header with an old
// E-Tag, that fresh content is returned
func TestEtagOutdated(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
@ -301,7 +301,7 @@ func TestEtagOutdated(t *testing.T) {
// Test that an api service can be added or removed
func TestAddRemove(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
@ -331,7 +331,7 @@ func TestAddRemove(t *testing.T) {
// Show that updating an existing service replaces and does not add the entry
// and instead replaces it
func TestUpdateService(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
for _, group := range apis.Items {
for _, version := range group.Versions {
@ -368,7 +368,7 @@ func TestUpdateService(t *testing.T) {
// Show the discovery manager is capable of serving requests to multiple users
// with unchanging data
func TestConcurrentRequests(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 15)
manager.SetGroups(apis.Items)
@ -410,7 +410,7 @@ func TestConcurrentRequests(t *testing.T) {
// concurrent writers without tripping up. Good to run with go '-race' detector
// since there are not many "correctness" checks
func TestAbuse(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
numReaders := 100
numRequestsPerReader := 1000
@ -505,7 +505,7 @@ func TestAbuse(t *testing.T) {
}
func TestVersionSortingNoPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1alpha1",
@ -537,7 +537,7 @@ func TestVersionSortingNoPriority(t *testing.T) {
}
func TestVersionSortingWithPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
@ -560,7 +560,7 @@ func TestVersionSortingWithPriority(t *testing.T) {
// if two apiservices declare conflicting priorities for their group priority, take the higher one.
func TestGroupVersionSortingConflictingPriority(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
manager.AddGroupVersion("default", apidiscoveryv2beta1.APIVersionDiscovery{
Version: "v1",
@ -588,7 +588,7 @@ func TestGroupVersionSortingConflictingPriority(t *testing.T) {
// Show that the GroupPriorityMinimum is not sticky if a higher group version is removed
// after a lower one is added
func TestStatelessGroupPriorityMinimum(t *testing.T) {
manager := discoveryendpoint.NewResourceManager()
manager := discoveryendpoint.NewResourceManager("apis")
stableGroup := "stable.example.com"
experimentalGroup := "experimental.example.com"

View File

@ -0,0 +1,36 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated
import (
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
var (
regenerationCounter = metrics.NewCounter(
&metrics.CounterOpts{
Name: "aggregator_discovery_aggregation_count_total",
Help: "Counter of number of times discovery was aggregated",
StabilityLevel: metrics.ALPHA,
},
)
)
func init() {
legacyregistry.MustRegister(regenerationCounter)
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package aggregated_test
import (
"fmt"
"io"
"strings"
"testing"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/metrics/testutil"
discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
)
func formatExpectedMetrics(aggregationCount int) io.Reader {
expected := ``
if aggregationCount > 0 {
expected = expected + `# HELP aggregator_discovery_aggregation_count_total [ALPHA] Counter of number of times discovery was aggregated
# TYPE aggregator_discovery_aggregation_count_total counter
aggregator_discovery_aggregation_count_total %d
`
}
args := []any{}
if aggregationCount > 0 {
args = append(args, aggregationCount)
}
return strings.NewReader(fmt.Sprintf(expected, args...))
}
func TestBasicMetrics(t *testing.T) {
legacyregistry.Reset()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
interests := []string{"aggregator_discovery_aggregation_count_total"}
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// A single fetch should aggregate and increment regeneration counter.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil {
t.Fatal(err)
}
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// Subsequent fetches should not reaggregate discovery.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil {
t.Fatal(err)
}
}
func TestMetricsModified(t *testing.T) {
legacyregistry.Reset()
manager := discoveryendpoint.NewResourceManager("apis")
apis := fuzzAPIGroups(1, 3, 10)
manager.SetGroups(apis.Items)
interests := []string{"aggregator_discovery_aggregation_count_total"}
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// A single fetch should aggregate and increment regeneration counter.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(1), interests...); err != nil {
t.Fatal(err)
}
// Update discovery document.
manager.SetGroups(fuzzAPIGroups(1, 3, 10).Items)
_, _, _ = fetchPath(manager, "application/json", discoveryPath, "")
// If the discovery content has changed, reaggregation should be performed.
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, formatExpectedMetrics(2), interests...); err != nil {
t.Fatal(err)
}
}

View File

@ -738,10 +738,10 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
manager := c.AggregatedDiscoveryGroupManager
if manager == nil {
manager = discoveryendpoint.NewResourceManager()
manager = discoveryendpoint.NewResourceManager("apis")
}
s.AggregatedDiscoveryGroupManager = manager
s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager()
s.AggregatedLegacyDiscoveryGroupManager = discoveryendpoint.NewResourceManager("api")
}
for {
if c.JSONPatchMaxCopyBytes <= 0 {

View File

@ -60,13 +60,13 @@ func waitForQueueComplete(stopCh <-chan struct{}, dm *discoveryManager) bool {
// Test that the discovery manager starts and aggregates from two local API services
func TestBasic(t *testing.T) {
service1 := discoveryendpoint.NewResourceManager()
service2 := discoveryendpoint.NewResourceManager()
service1 := discoveryendpoint.NewResourceManager("apis")
service2 := discoveryendpoint.NewResourceManager("apis")
apiGroup1 := fuzzAPIGroups(2, 5, 25)
apiGroup2 := fuzzAPIGroups(2, 5, 50)
service1.SetGroups(apiGroup1.Items)
service2.SetGroups(apiGroup2.Items)
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
for _, g := range apiGroup1.Items {
@ -140,8 +140,8 @@ func checkAPIGroups(t *testing.T, api apidiscoveryv2beta1.APIGroupDiscoveryList,
// APIService has been marked as dirty
func TestDirty(t *testing.T) {
var pinged atomic.Bool
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
service := discoveryendpoint.NewResourceManager("apis")
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
@ -176,8 +176,8 @@ func TestDirty(t *testing.T) {
// complete by artificially making the sync handler take a long time
func TestWaitForSync(t *testing.T) {
pinged := atomic.Bool{}
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
service := discoveryendpoint.NewResourceManager("apis")
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
@ -212,8 +212,8 @@ func TestWaitForSync(t *testing.T) {
// Show that an APIService can be removed and that its group no longer remains
// if there are no versions
func TestRemoveAPIService(t *testing.T) {
aggyService := discoveryendpoint.NewResourceManager()
service := discoveryendpoint.NewResourceManager()
aggyService := discoveryendpoint.NewResourceManager("apis")
service := discoveryendpoint.NewResourceManager("apis")
apiGroup := fuzzAPIGroups(2, 3, 10)
service.SetGroups(apiGroup.Items)
@ -265,7 +265,7 @@ func TestRemoveAPIService(t *testing.T) {
}
func TestLegacyFallbackNoCache(t *testing.T) {
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
@ -436,7 +436,7 @@ func (a byVersion) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byVersion) Less(i, j int) bool { return versionMap[a[i].Version] < versionMap[a[j].Version] }
func TestLegacyFallback(t *testing.T) {
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager("apis")
rootAPIsHandler := discovery.NewRootAPIsHandler(discovery.DefaultAddresses{DefaultAddress: "192.168.1.1"}, scheme.Codecs)
legacyGroupHandler := discovery.NewAPIGroupHandler(scheme.Codecs, metav1.APIGroup{
@ -534,8 +534,8 @@ func TestLegacyFallback(t *testing.T) {
// This path in 1.26.0 would result in a deadlock if an aggregated APIService
// returned a 304 Not Modified response for its own aggregated discovery document.
func TestNotModified(t *testing.T) {
aggyService := discoveryendpoint.NewResourceManager()
service := discoveryendpoint.NewResourceManager()
aggyService := discoveryendpoint.NewResourceManager("apis")
service := discoveryendpoint.NewResourceManager("apis")
apiGroup := fuzzAPIGroups(2, 3, 10)
service.SetGroups(apiGroup.Items)

View File

@ -199,7 +199,7 @@ func TestAggregatedAPIServiceDiscovery(t *testing.T) {
defer cleanup()
// Create a resource manager whichs serves our GroupVersion
resourceManager := discoveryendpoint.NewResourceManager()
resourceManager := discoveryendpoint.NewResourceManager("apis")
resourceManager.SetGroups([]apidiscoveryv2beta1.APIGroupDiscovery{basicTestGroup})
// Install our ResourceManager as an Aggregated APIService to the