Merge pull request #115302 from alexzielenski/apiserver/discovery/lastreconciled-race

fix race in aggregated discovery controller
This commit is contained in:
Kubernetes Prow Robot 2023-01-25 23:12:24 -08:00 committed by GitHub
commit eb4e2a2225
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 42 deletions

View File

@ -61,14 +61,10 @@ type DiscoveryAggregationController interface {
// Spwans a worker which waits for added/updated apiservices and updates // Spwans a worker which waits for added/updated apiservices and updates
// the unified discovery document by contacting the aggregated api services // the unified discovery document by contacting the aggregated api services
Run(stopCh <-chan struct{}) Run(stopCh <-chan struct{})
// Returns true if all non-local APIServices that have been added
// are synced at least once to the discovery document
ExternalServicesSynced() bool
} }
type discoveryManager struct { type discoveryManager struct {
// Locks `services` // Locks `apiServices`
servicesLock sync.RWMutex servicesLock sync.RWMutex
// Map from APIService's name (or a unique string for local servers) // Map from APIService's name (or a unique string for local servers)
@ -147,9 +143,6 @@ type groupVersionInfo struct {
// was stored, the discovery document will always be re-fetched. // was stored, the discovery document will always be re-fetched.
lastMarkedDirty time.Time lastMarkedDirty time.Time
// Last time sync function was run for this GV.
lastReconciled time.Time
// ServiceReference of this GroupVersion. This identifies the Service which // ServiceReference of this GroupVersion. This identifies the Service which
// describes how to contact the server responsible for this GroupVersion. // describes how to contact the server responsible for this GroupVersion.
service serviceKey service serviceKey
@ -350,12 +343,8 @@ func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
} }
// Lookup last cached result for this apiservice's service. // Lookup last cached result for this apiservice's service.
now := time.Now()
cached, err := dm.fetchFreshDiscoveryForService(mgv, info) cached, err := dm.fetchFreshDiscoveryForService(mgv, info)
info.lastReconciled = now
dm.setInfoForAPIService(apiServiceName, &info)
var entry apidiscoveryv2beta1.APIVersionDiscovery var entry apidiscoveryv2beta1.APIVersionDiscovery
// Extract the APIService's specific resource information from the // Extract the APIService's specific resource information from the
@ -477,18 +466,6 @@ func (dm *discoveryManager) RemoveAPIService(apiServiceName string) {
} }
} }
func (dm *discoveryManager) ExternalServicesSynced() bool {
dm.servicesLock.RLock()
defer dm.servicesLock.RUnlock()
for _, info := range dm.apiServices {
if info.lastReconciled.IsZero() {
return false
}
}
return true
}
// //
// Lock-protected accessors // Lock-protected accessors
// //

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package apiserver_test package apiserver
import ( import (
"context" "context"
@ -37,9 +37,21 @@ import (
scheme "k8s.io/client-go/kubernetes/scheme" scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apiserver"
) )
func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager {
return NewDiscoveryManager(rm).(*discoveryManager)
}
// Returns true if the queue of services to sync empty this means everything has
// been reconciled and placed into merged document
func waitForEmptyQueue(stopCh <-chan struct{}, dm *discoveryManager) bool {
return cache.WaitForCacheSync(stopCh, func() bool {
// Once items have successfully synced they are removed from queue.
return dm.dirtyAPIServiceQueue.Len() == 0
})
}
// Test that the discovery manager starts and aggregates from two local API services // Test that the discovery manager starts and aggregates from two local API services
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
service1 := discoveryendpoint.NewResourceManager() service1 := discoveryendpoint.NewResourceManager()
@ -49,7 +61,7 @@ func TestBasic(t *testing.T) {
service1.SetGroups(apiGroup1.Items) service1.SetGroups(apiGroup1.Items)
service2.SetGroups(apiGroup2.Items) service2.SetGroups(apiGroup2.Items)
aggregatedResourceManager := discoveryendpoint.NewResourceManager() aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
for _, g := range apiGroup1.Items { for _, g := range apiGroup1.Items {
for _, v := range g.Versions { for _, v := range g.Versions {
@ -85,10 +97,12 @@ func TestBasic(t *testing.T) {
} }
} }
testCtx, _ := context.WithCancel(context.Background()) testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()
go aggregatedManager.Run(testCtx.Done()) go aggregatedManager.Run(testCtx.Done())
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggregatedResourceManager, "") response, _, parsed := fetchPath(aggregatedResourceManager, "")
if response.StatusCode != 200 { if response.StatusCode != 200 {
@ -123,7 +137,8 @@ func TestDirty(t *testing.T) {
service := discoveryendpoint.NewResourceManager() service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager() aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com", Name: "v1.stable.example.com",
@ -143,7 +158,7 @@ func TestDirty(t *testing.T) {
defer cancel() defer cancel()
go aggregatedManager.Run(testCtx.Done()) go aggregatedManager.Run(testCtx.Done())
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services // immediately check for ping, since Run() should block for local services
if !pinged { if !pinged {
@ -180,20 +195,22 @@ func TestRemoveAPIService(t *testing.T) {
} }
} }
aggregatedManager := apiserver.NewDiscoveryManager(aggyService) aggregatedManager := newDiscoveryManager(aggyService)
for _, s := range apiServices { for _, s := range apiServices {
aggregatedManager.AddAPIService(s, service) aggregatedManager.AddAPIService(s, service)
} }
testCtx, _ := context.WithCancel(context.Background()) testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()
go aggregatedManager.Run(testCtx.Done()) go aggregatedManager.Run(testCtx.Done())
for _, s := range apiServices { for _, s := range apiServices {
aggregatedManager.RemoveAPIService(s.Name) aggregatedManager.RemoveAPIService(s.Name)
} }
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced) require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggyService, "") response, _, parsed := fetchPath(aggyService, "")
if response.StatusCode != 200 { if response.StatusCode != 200 {
@ -246,7 +263,7 @@ func TestLegacyFallback(t *testing.T) {
} }
})) }))
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager) aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{ aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com", Name: "v1.stable.example.com",
@ -275,7 +292,7 @@ func TestLegacyFallback(t *testing.T) {
defer cancel() defer cancel()
go aggregatedManager.Run(testCtx.Done()) go aggregatedManager.Run(testCtx.Done())
require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document // At this point external services have synced. Check if discovery document
// includes the legacy resources // includes the legacy resources
@ -329,8 +346,7 @@ func TestNotModified(t *testing.T) {
} }
} }
aggregatedManager := apiserver.NewDiscoveryManager(aggyService) aggregatedManager := newDiscoveryManager(aggyService)
// Add all except the last group. // Add all except the last group.
// Ensure this is done BEFORE the call to run, so they are included in initial // Ensure this is done BEFORE the call to run, so they are included in initial
// list to keep test focused // list to keep test focused
@ -345,10 +361,10 @@ func TestNotModified(t *testing.T) {
// Important to wait here to ensure we prime the cache with the initial list // Important to wait here to ensure we prime the cache with the initial list
// of documents in order to exercise 304 Not Modified // of documents in order to exercise 304 Not Modified
require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
// Now add all groups. We excluded one group before so that ExternalServicesSynced // Now add all groups. We excluded one group before so that AllServicesSynced
// could include it in this round. Now, if ExternalServicesSynced ever returns // could include it in this round. Now, if AllServicesSynced ever returns
// true, it must have synced all the pre-existing groups before, which would // true, it must have synced all the pre-existing groups before, which would
// return 304 Not Modified // return 304 Not Modified
for _, s := range apiServices { for _, s := range apiServices {
@ -356,7 +372,7 @@ func TestNotModified(t *testing.T) {
} }
// This would wait the full timeout on 1.26.0. // This would wait the full timeout on 1.26.0.
require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)) require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
} }
// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go // copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go