Merge pull request #50335 from ironcladlou/gc-discovery-optimization

Automatic merge from submit-queue (batch tested with PRs 49642, 50335, 50390, 49283, 46582)

Improve GC discovery sync performance

Improve GC discovery sync performance by only syncing when discovered
resource diffs are detected. Before, the GC worker pool was shut down
and monitors resynced unconditionally every sync period, leading to
significant processing delays causing test flakes where otherwise
reasonable GC timeouts were being exceeded.

Related to https://github.com/kubernetes/kubernetes/issues/49966.

/cc @kubernetes/sig-api-machinery-bugs

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-08-10 00:53:19 -07:00 committed by GitHub
commit 9bbcd4af60
5 changed files with 61 additions and 19 deletions

View File

@ -340,7 +340,7 @@ func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
// Periodically refresh the RESTMapper with new discovery information and sync // Periodically refresh the RESTMapper with new discovery information and sync
// the garbage collector. // the garbage collector.
go garbageCollector.Sync(restMapper, discoveryClient, 30*time.Second, ctx.Stop) go garbageCollector.Sync(gcClientset.Discovery(), 30*time.Second, ctx.Stop)
return true, nil return true, nil
} }

View File

@ -18,6 +18,7 @@ package garbagecollector
import ( import (
"fmt" "fmt"
"reflect"
"sync" "sync"
"time" "time"
@ -59,7 +60,7 @@ const ResourceResyncTime time.Duration = 0
// ensures that the garbage collector operates with a graph that is at least as // ensures that the garbage collector operates with a graph that is at least as
// up to date as the notification is sent. // up to date as the notification is sent.
type GarbageCollector struct { type GarbageCollector struct {
restMapper meta.RESTMapper restMapper resettableRESTMapper
// clientPool uses the regular dynamicCodec. We need it to update // clientPool uses the regular dynamicCodec. We need it to update
// finalizers. It can be removed if we support patching finalizers. // finalizers. It can be removed if we support patching finalizers.
clientPool dynamic.ClientPool clientPool dynamic.ClientPool
@ -81,7 +82,7 @@ type GarbageCollector struct {
func NewGarbageCollector( func NewGarbageCollector(
metaOnlyClientPool dynamic.ClientPool, metaOnlyClientPool dynamic.ClientPool,
clientPool dynamic.ClientPool, clientPool dynamic.ClientPool,
mapper meta.RESTMapper, mapper resettableRESTMapper,
deletableResources map[schema.GroupVersionResource]struct{}, deletableResources map[schema.GroupVersionResource]struct{},
ignoredResources map[schema.GroupResource]struct{}, ignoredResources map[schema.GroupResource]struct{},
sharedInformers informers.SharedInformerFactory, sharedInformers informers.SharedInformerFactory,
@ -162,25 +163,59 @@ type resettableRESTMapper interface {
Reset() Reset()
} }
// Sync periodically resyncs the garbage collector monitors with resources // Sync periodically resyncs the garbage collector when new resources are
// returned found via the discoveryClient. Sync blocks, continuing to sync until // observed from discovery. When new resources are detected, Sync will stop all
// a message is received on stopCh. // GC workers, reset gc.restMapper, and resync the monitors.
// //
// The discoveryClient should be the same client which underlies restMapper. // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
func (gc *GarbageCollector) Sync(restMapper resettableRESTMapper, discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) { // the mapper's underlying discovery client will be unnecessarily reset during
// the course of detecting new resources.
func (gc *GarbageCollector) Sync(discoveryClient discovery.DiscoveryInterface, period time.Duration, stopCh <-chan struct{}) {
oldResources := make(map[schema.GroupVersionResource]struct{})
wait.Until(func() { wait.Until(func() {
// Get the current resource list from discovery.
newResources, err := GetDeletableResources(discoveryClient)
if err != nil {
utilruntime.HandleError(err)
return
}
// Detect first or abnormal sync and try again later.
if oldResources == nil || len(oldResources) == 0 {
oldResources = newResources
return
}
// Decide whether discovery has reported a change.
if reflect.DeepEqual(oldResources, newResources) {
glog.V(5).Infof("no resource updates from discovery, skipping garbage collector sync")
return
}
// Something has changed, so track the new state and perform a sync.
glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources)
oldResources = newResources
// Ensure workers are paused to avoid processing events before informers // Ensure workers are paused to avoid processing events before informers
// have resynced. // have resynced.
gc.workerLock.Lock() gc.workerLock.Lock()
defer gc.workerLock.Unlock() defer gc.workerLock.Unlock()
restMapper.Reset() // Resetting the REST mapper will also invalidate the underlying discovery
deletableResources, err := GetDeletableResources(discoveryClient) // client. This is a leaky abstraction and assumes behavior about the REST
if err != nil { // mapper, but we'll deal with it for now.
utilruntime.HandleError(err) gc.restMapper.Reset()
return
} // Perform the monitor resync and wait for controllers to report cache sync.
if err := gc.resyncMonitors(deletableResources); err != nil { //
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared inbetween the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// sync period.
if err := gc.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return return
} }

View File

@ -46,11 +46,17 @@ import (
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
) )
type testRESTMapper struct {
meta.RESTMapper
}
func (_ *testRESTMapper) Reset() {}
func TestGarbageCollectorConstruction(t *testing.T) { func TestGarbageCollectorConstruction(t *testing.T) {
config := &restclient.Config{} config := &restclient.Config{}
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
tweakableRM := meta.NewDefaultRESTMapper(nil, nil) tweakableRM := meta.NewDefaultRESTMapper(nil, nil)
rm := meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()} rm := &testRESTMapper{meta.MultiRESTMapper{tweakableRM, api.Registry.RESTMapper()}}
metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) metaOnlyClientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
config.ContentConfig.NegotiatedSerializer = nil config.ContentConfig.NegotiatedSerializer = nil
clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc) clientPool := dynamic.NewClientPool(config, rm, dynamic.LegacyAPIPathResolverFunc)
@ -168,7 +174,7 @@ func setupGC(t *testing.T, config *restclient.Config) garbageCollector {
podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}} podResource := map[schema.GroupVersionResource]struct{}{{Version: "v1", Resource: "pods"}: {}}
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(client, 0) sharedInformers := informers.NewSharedInformerFactory(client, 0)
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, api.Registry.RESTMapper(), podResource, ignoredResources, sharedInformers) gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, &testRESTMapper{api.Registry.RESTMapper()}, podResource, ignoredResources, sharedInformers)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -283,7 +283,8 @@ func newCronJob(name, schedule string) *batchv2alpha1.CronJob {
Completions: &completions, Completions: &completions,
Template: v1.PodTemplateSpec{ Template: v1.PodTemplateSpec{
Spec: v1.PodSpec{ Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure, RestartPolicy: v1.RestartPolicyOnFailure,
TerminationGracePeriodSeconds: &zero,
Containers: []v1.Container{ Containers: []v1.Container{
{ {
Name: "c", Name: "c",

View File

@ -265,7 +265,7 @@ func setup(t *testing.T, workerCount int) *testContext {
syncPeriod := 5 * time.Second syncPeriod := 5 * time.Second
startGC := func(workers int) { startGC := func(workers int) {
go gc.Run(workers, stopCh) go gc.Run(workers, stopCh)
go gc.Sync(restMapper, discoveryClient, syncPeriod, stopCh) go gc.Sync(clientSet.Discovery(), syncPeriod, stopCh)
} }
if workerCount > 0 { if workerCount > 0 {