fix race in aggregated discovery handler

Caused by following sequence:

1. Add APIService to map
2. Begin Async Fetch
3. Remove APIService from map
4. Finish Async Fetch & stores apiservice back in map to update lastReconciled time

fixes by removing lastReconciled (only used for testing) and switching tests to just waiting until dirty queue is empty
This commit is contained in:
Alexander Zielenski 2023-01-24 14:23:57 -08:00
parent 14549722e4
commit cff4d07581
2 changed files with 35 additions and 42 deletions

View File

@ -61,14 +61,10 @@ type DiscoveryAggregationController interface {
// Spwans a worker which waits for added/updated apiservices and updates
// the unified discovery document by contacting the aggregated api services
Run(stopCh <-chan struct{})
// Returns true if all non-local APIServices that have been added
// are synced at least once to the discovery document
ExternalServicesSynced() bool
}
type discoveryManager struct {
// Locks `services`
// Locks `apiServices`
servicesLock sync.RWMutex
// Map from APIService's name (or a unique string for local servers)
@ -147,9 +143,6 @@ type groupVersionInfo struct {
// was stored, the discovery document will always be re-fetched.
lastMarkedDirty time.Time
// Last time sync function was run for this GV.
lastReconciled time.Time
// ServiceReference of this GroupVersion. This identifies the Service which
// describes how to contact the server responsible for this GroupVersion.
service serviceKey
@ -350,12 +343,8 @@ func (dm *discoveryManager) syncAPIService(apiServiceName string) error {
}
// Lookup last cached result for this apiservice's service.
now := time.Now()
cached, err := dm.fetchFreshDiscoveryForService(mgv, info)
info.lastReconciled = now
dm.setInfoForAPIService(apiServiceName, &info)
var entry apidiscoveryv2beta1.APIVersionDiscovery
// Extract the APIService's specific resource information from the
@ -477,18 +466,6 @@ func (dm *discoveryManager) RemoveAPIService(apiServiceName string) {
}
}
func (dm *discoveryManager) ExternalServicesSynced() bool {
dm.servicesLock.RLock()
defer dm.servicesLock.RUnlock()
for _, info := range dm.apiServices {
if info.lastReconciled.IsZero() {
return false
}
}
return true
}
//
// Lock-protected accessors
//

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package apiserver_test
package apiserver
import (
"context"
@ -37,9 +37,21 @@ import (
scheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-aggregator/pkg/apiserver"
)
func newDiscoveryManager(rm discoveryendpoint.ResourceManager) *discoveryManager {
return NewDiscoveryManager(rm).(*discoveryManager)
}
// Returns true if the queue of services to sync empty this means everything has
// been reconciled and placed into merged document
func waitForEmptyQueue(stopCh <-chan struct{}, dm *discoveryManager) bool {
return cache.WaitForCacheSync(stopCh, func() bool {
// Once items have successfully synced they are removed from queue.
return dm.dirtyAPIServiceQueue.Len() == 0
})
}
// Test that the discovery manager starts and aggregates from two local API services
func TestBasic(t *testing.T) {
service1 := discoveryendpoint.NewResourceManager()
@ -49,7 +61,7 @@ func TestBasic(t *testing.T) {
service1.SetGroups(apiGroup1.Items)
service2.SetGroups(apiGroup2.Items)
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager)
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
for _, g := range apiGroup1.Items {
for _, v := range g.Versions {
@ -85,10 +97,12 @@ func TestBasic(t *testing.T) {
}
}
testCtx, _ := context.WithCancel(context.Background())
testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()
go aggregatedManager.Run(testCtx.Done())
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggregatedResourceManager, "")
if response.StatusCode != 200 {
@ -123,7 +137,8 @@ func TestDirty(t *testing.T) {
service := discoveryendpoint.NewResourceManager()
aggregatedResourceManager := discoveryendpoint.NewResourceManager()
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager)
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
@ -143,7 +158,7 @@ func TestDirty(t *testing.T) {
defer cancel()
go aggregatedManager.Run(testCtx.Done())
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
// immediately check for ping, since Run() should block for local services
if !pinged {
@ -180,20 +195,22 @@ func TestRemoveAPIService(t *testing.T) {
}
}
aggregatedManager := apiserver.NewDiscoveryManager(aggyService)
aggregatedManager := newDiscoveryManager(aggyService)
for _, s := range apiServices {
aggregatedManager.AddAPIService(s, service)
}
testCtx, _ := context.WithCancel(context.Background())
testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()
go aggregatedManager.Run(testCtx.Done())
for _, s := range apiServices {
aggregatedManager.RemoveAPIService(s.Name)
}
cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced)
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
response, _, parsed := fetchPath(aggyService, "")
if response.StatusCode != 200 {
@ -246,7 +263,7 @@ func TestLegacyFallback(t *testing.T) {
}
}))
aggregatedManager := apiserver.NewDiscoveryManager(aggregatedResourceManager)
aggregatedManager := newDiscoveryManager(aggregatedResourceManager)
aggregatedManager.AddAPIService(&apiregistrationv1.APIService{
ObjectMeta: metav1.ObjectMeta{
Name: "v1.stable.example.com",
@ -275,7 +292,7 @@ func TestLegacyFallback(t *testing.T) {
defer cancel()
go aggregatedManager.Run(testCtx.Done())
require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced))
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
// At this point external services have synced. Check if discovery document
// includes the legacy resources
@ -329,8 +346,7 @@ func TestNotModified(t *testing.T) {
}
}
aggregatedManager := apiserver.NewDiscoveryManager(aggyService)
aggregatedManager := newDiscoveryManager(aggyService)
// Add all except the last group.
// Ensure this is done BEFORE the call to run, so they are included in initial
// list to keep test focused
@ -345,10 +361,10 @@ func TestNotModified(t *testing.T) {
// Important to wait here to ensure we prime the cache with the initial list
// of documents in order to exercise 304 Not Modified
require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced))
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
// Now add all groups. We excluded one group before so that ExternalServicesSynced
// could include it in this round. Now, if ExternalServicesSynced ever returns
// Now add all groups. We excluded one group before so that AllServicesSynced
// could include it in this round. Now, if AllServicesSynced ever returns
// true, it must have synced all the pre-existing groups before, which would
// return 304 Not Modified
for _, s := range apiServices {
@ -356,7 +372,7 @@ func TestNotModified(t *testing.T) {
}
// This would wait the full timeout on 1.26.0.
require.True(t, cache.WaitForCacheSync(testCtx.Done(), aggregatedManager.ExternalServicesSynced))
require.True(t, waitForEmptyQueue(testCtx.Done(), aggregatedManager))
}
// copied from staging/src/k8s.io/apiserver/pkg/endpoints/discovery/v2/handler_test.go