mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 06:27:05 +00:00
Merge pull request #31167 from caesarxuchao/gc-absent-owner-cache
Automatic merge from submit-queue [GarbageCollector] add absent owner cache <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md 2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md 3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes --> **What this PR does / why we need it**: Reducing the Request sent to the API server by the garbage collector to check if an owner exists. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes # #26120 **Special notes for your reviewer**: **Release note**: <!-- Steps to write your release note: 1. Use the release-note-* labels to set the release note state (if you have access) 2. Enter your extended release note in the below block; leaving it blank means using the PR title as the release note. If no release note is required, just write `NONE`. --> ```release-note ``` Currently when processing an item in the dirtyQueue, the garbage collector issues GET to check if any of its owners exist. If the owner is a replication controller with 1000 pods, the garbage collector sends a GET for the RC 1000 times. This PR caches the owner's UID if it does not exist according to the API server. This cuts 1/3 of the garbage collection time of the density test in the gce-500 and gce-scale, where the QPS is the bottleneck.
This commit is contained in:
commit
8f431e4af8
@ -443,6 +443,8 @@ type GarbageCollector struct {
|
|||||||
clock clock.Clock
|
clock clock.Clock
|
||||||
registeredRateLimiter *RegisteredRateLimiter
|
registeredRateLimiter *RegisteredRateLimiter
|
||||||
registeredRateLimiterForMonitors *RegisteredRateLimiter
|
registeredRateLimiterForMonitors *RegisteredRateLimiter
|
||||||
|
// GC caches the owners that do not exist according to the API server.
|
||||||
|
absentOwnerCache *UIDCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch {
|
func gcListWatcher(client *dynamic.Client, resource unversioned.GroupVersionResource) *cache.ListWatch {
|
||||||
@ -543,6 +545,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
|
|||||||
orphanQueue: workqueue.NewTimedWorkQueue(),
|
orphanQueue: workqueue.NewTimedWorkQueue(),
|
||||||
registeredRateLimiter: NewRegisteredRateLimiter(resources),
|
registeredRateLimiter: NewRegisteredRateLimiter(resources),
|
||||||
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources),
|
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources),
|
||||||
|
absentOwnerCache: NewUIDCache(100),
|
||||||
}
|
}
|
||||||
gc.propagator = &Propagator{
|
gc.propagator = &Propagator{
|
||||||
eventQueue: workqueue.NewTimedWorkQueue(),
|
eventQueue: workqueue.NewTimedWorkQueue(),
|
||||||
@ -708,6 +711,10 @@ func (gc *GarbageCollector) processItem(item *node) error {
|
|||||||
// TODO: we need to remove dangling references if the object is not to be
|
// TODO: we need to remove dangling references if the object is not to be
|
||||||
// deleted.
|
// deleted.
|
||||||
for _, reference := range ownerReferences {
|
for _, reference := range ownerReferences {
|
||||||
|
if gc.absentOwnerCache.Has(reference.UID) {
|
||||||
|
glog.V(6).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
// TODO: we need to verify the reference resource is supported by the
|
// TODO: we need to verify the reference resource is supported by the
|
||||||
// system. If it's not a valid resource, the garbage collector should i)
|
// system. If it's not a valid resource, the garbage collector should i)
|
||||||
// ignore the reference when decide if the object should be deleted, and
|
// ignore the reference when decide if the object should be deleted, and
|
||||||
@ -727,11 +734,13 @@ func (gc *GarbageCollector) processItem(item *node) error {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
if owner.GetUID() != reference.UID {
|
if owner.GetUID() != reference.UID {
|
||||||
glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||||
|
gc.absentOwnerCache.Add(reference.UID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID)
|
glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID)
|
||||||
return nil
|
return nil
|
||||||
} else if errors.IsNotFound(err) {
|
} else if errors.IsNotFound(err) {
|
||||||
|
gc.absentOwnerCache.Add(reference.UID)
|
||||||
glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name)
|
||||||
} else {
|
} else {
|
||||||
return err
|
return err
|
||||||
|
@ -105,34 +105,51 @@ func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request))
|
|||||||
return srv, config
|
return srv, config
|
||||||
}
|
}
|
||||||
|
|
||||||
func newDanglingPod() *v1.Pod {
|
func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector {
|
||||||
|
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
||||||
|
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
||||||
|
config.ContentConfig.NegotiatedSerializer = nil
|
||||||
|
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
||||||
|
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
|
||||||
|
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return gc
|
||||||
|
}
|
||||||
|
|
||||||
|
func getPod(podName string, ownerReferences []v1.OwnerReference) *v1.Pod {
|
||||||
return &v1.Pod{
|
return &v1.Pod{
|
||||||
TypeMeta: unversioned.TypeMeta{
|
TypeMeta: unversioned.TypeMeta{
|
||||||
Kind: "Pod",
|
Kind: "Pod",
|
||||||
APIVersion: "v1",
|
APIVersion: "v1",
|
||||||
},
|
},
|
||||||
ObjectMeta: v1.ObjectMeta{
|
ObjectMeta: v1.ObjectMeta{
|
||||||
Name: "ToBeDeletedPod",
|
Name: podName,
|
||||||
Namespace: "ns1",
|
Namespace: "ns1",
|
||||||
OwnerReferences: []v1.OwnerReference{
|
OwnerReferences: ownerReferences,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func serilizeOrDie(t *testing.T, object interface{}) []byte {
|
||||||
|
data, err := json.Marshal(object)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
// test the processItem function making the expected actions.
|
||||||
|
func TestProcessItem(t *testing.T) {
|
||||||
|
pod := getPod("ToBeDeletedPod", []v1.OwnerReference{
|
||||||
{
|
{
|
||||||
Kind: "ReplicationController",
|
Kind: "ReplicationController",
|
||||||
Name: "owner1",
|
Name: "owner1",
|
||||||
UID: "123",
|
UID: "123",
|
||||||
APIVersion: "v1",
|
APIVersion: "v1",
|
||||||
},
|
},
|
||||||
},
|
})
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// test the processItem function making the expected actions.
|
|
||||||
func TestProcessItem(t *testing.T) {
|
|
||||||
pod := newDanglingPod()
|
|
||||||
podBytes, err := json.Marshal(pod)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
testHandler := &fakeActionHandler{
|
testHandler := &fakeActionHandler{
|
||||||
response: map[string]FakeResponse{
|
response: map[string]FakeResponse{
|
||||||
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/owner1": {
|
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/owner1": {
|
||||||
@ -141,21 +158,13 @@ func TestProcessItem(t *testing.T) {
|
|||||||
},
|
},
|
||||||
"GET" + "/api/v1/namespaces/ns1/pods/ToBeDeletedPod": {
|
"GET" + "/api/v1/namespaces/ns1/pods/ToBeDeletedPod": {
|
||||||
200,
|
200,
|
||||||
podBytes,
|
serilizeOrDie(t, pod),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
|
|
||||||
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||||
defer srv.Close()
|
defer srv.Close()
|
||||||
clientConfig.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
gc := setupGC(t, clientConfig)
|
||||||
metaOnlyClientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
|
|
||||||
clientConfig.ContentConfig.NegotiatedSerializer = nil
|
|
||||||
clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
|
|
||||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
item := &node{
|
item := &node{
|
||||||
identity: objectReference{
|
identity: objectReference{
|
||||||
OwnerReference: metatypes.OwnerReference{
|
OwnerReference: metatypes.OwnerReference{
|
||||||
@ -169,7 +178,7 @@ func TestProcessItem(t *testing.T) {
|
|||||||
// owners are intentionally left empty. The processItem routine should get the latest item from the server.
|
// owners are intentionally left empty. The processItem routine should get the latest item from the server.
|
||||||
owners: nil,
|
owners: nil,
|
||||||
}
|
}
|
||||||
err = gc.processItem(item)
|
err := gc.processItem(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected Error: %v", err)
|
t.Errorf("Unexpected Error: %v", err)
|
||||||
}
|
}
|
||||||
@ -304,16 +313,7 @@ func TestProcessEvent(t *testing.T) {
|
|||||||
// TestDependentsRace relies on golang's data race detector to check if there is
|
// TestDependentsRace relies on golang's data race detector to check if there is
|
||||||
// data race among in the dependents field.
|
// data race among in the dependents field.
|
||||||
func TestDependentsRace(t *testing.T) {
|
func TestDependentsRace(t *testing.T) {
|
||||||
config := &restclient.Config{}
|
gc := setupGC(t, &restclient.Config{})
|
||||||
config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()}
|
|
||||||
metaOnlyClientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
|
||||||
config.ContentConfig.NegotiatedSerializer = nil
|
|
||||||
clientPool := dynamic.NewClientPool(config, dynamic.LegacyAPIPathResolverFunc)
|
|
||||||
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
|
|
||||||
gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, podResource)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
const updates = 100
|
const updates = 100
|
||||||
owner := &node{dependents: make(map[*node]struct{})}
|
owner := &node{dependents: make(map[*node]struct{})}
|
||||||
@ -358,3 +358,116 @@ func TestGCListWatcher(t *testing.T) {
|
|||||||
t.Errorf("expect %s, got %s", e, a)
|
t.Errorf("expect %s, got %s", e, a)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func podToGCNode(pod *v1.Pod) *node {
|
||||||
|
return &node{
|
||||||
|
identity: objectReference{
|
||||||
|
OwnerReference: metatypes.OwnerReference{
|
||||||
|
Kind: pod.Kind,
|
||||||
|
APIVersion: pod.APIVersion,
|
||||||
|
Name: pod.Name,
|
||||||
|
UID: pod.UID,
|
||||||
|
},
|
||||||
|
Namespace: pod.Namespace,
|
||||||
|
},
|
||||||
|
// owners are intentionally left empty. The processItem routine should get the latest item from the server.
|
||||||
|
owners: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAbsentUIDCache(t *testing.T) {
|
||||||
|
rc1Pod1 := getPod("rc1Pod1", []v1.OwnerReference{
|
||||||
|
{
|
||||||
|
Kind: "ReplicationController",
|
||||||
|
Name: "rc1",
|
||||||
|
UID: "1",
|
||||||
|
APIVersion: "v1",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
rc1Pod2 := getPod("rc1Pod2", []v1.OwnerReference{
|
||||||
|
{
|
||||||
|
Kind: "ReplicationController",
|
||||||
|
Name: "rc1",
|
||||||
|
UID: "1",
|
||||||
|
APIVersion: "v1",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
rc2Pod1 := getPod("rc2Pod1", []v1.OwnerReference{
|
||||||
|
{
|
||||||
|
Kind: "ReplicationController",
|
||||||
|
Name: "rc2",
|
||||||
|
UID: "2",
|
||||||
|
APIVersion: "v1",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
rc3Pod1 := getPod("rc3Pod1", []v1.OwnerReference{
|
||||||
|
{
|
||||||
|
Kind: "ReplicationController",
|
||||||
|
Name: "rc3",
|
||||||
|
UID: "3",
|
||||||
|
APIVersion: "v1",
|
||||||
|
},
|
||||||
|
})
|
||||||
|
testHandler := &fakeActionHandler{
|
||||||
|
response: map[string]FakeResponse{
|
||||||
|
"GET" + "/api/v1/namespaces/ns1/pods/rc1Pod1": {
|
||||||
|
200,
|
||||||
|
serilizeOrDie(t, rc1Pod1),
|
||||||
|
},
|
||||||
|
"GET" + "/api/v1/namespaces/ns1/pods/rc1Pod2": {
|
||||||
|
200,
|
||||||
|
serilizeOrDie(t, rc1Pod2),
|
||||||
|
},
|
||||||
|
"GET" + "/api/v1/namespaces/ns1/pods/rc2Pod1": {
|
||||||
|
200,
|
||||||
|
serilizeOrDie(t, rc2Pod1),
|
||||||
|
},
|
||||||
|
"GET" + "/api/v1/namespaces/ns1/pods/rc3Pod1": {
|
||||||
|
200,
|
||||||
|
serilizeOrDie(t, rc3Pod1),
|
||||||
|
},
|
||||||
|
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc1": {
|
||||||
|
404,
|
||||||
|
[]byte{},
|
||||||
|
},
|
||||||
|
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc2": {
|
||||||
|
404,
|
||||||
|
[]byte{},
|
||||||
|
},
|
||||||
|
"GET" + "/api/v1/namespaces/ns1/replicationcontrollers/rc3": {
|
||||||
|
404,
|
||||||
|
[]byte{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
|
||||||
|
defer srv.Close()
|
||||||
|
gc := setupGC(t, clientConfig)
|
||||||
|
gc.absentOwnerCache = NewUIDCache(2)
|
||||||
|
gc.processItem(podToGCNode(rc1Pod1))
|
||||||
|
gc.processItem(podToGCNode(rc2Pod1))
|
||||||
|
// rc1 should already be in the cache, no request should be sent. rc1 should be promoted in the UIDCache
|
||||||
|
gc.processItem(podToGCNode(rc1Pod2))
|
||||||
|
// after this call, rc2 should be evicted from the UIDCache
|
||||||
|
gc.processItem(podToGCNode(rc3Pod1))
|
||||||
|
// check cache
|
||||||
|
if !gc.absentOwnerCache.Has(types.UID("1")) {
|
||||||
|
t.Errorf("expected rc1 to be in the cache")
|
||||||
|
}
|
||||||
|
if gc.absentOwnerCache.Has(types.UID("2")) {
|
||||||
|
t.Errorf("expected rc2 to not exist in the cache")
|
||||||
|
}
|
||||||
|
if !gc.absentOwnerCache.Has(types.UID("3")) {
|
||||||
|
t.Errorf("expected rc3 to be in the cache")
|
||||||
|
}
|
||||||
|
// check the request sent to the server
|
||||||
|
count := 0
|
||||||
|
for _, action := range testHandler.actions {
|
||||||
|
if action.String() == "GET=/api/v1/namespaces/ns1/replicationcontrollers/rc1" {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count != 1 {
|
||||||
|
t.Errorf("expected only 1 GET rc1 request, got %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
52
pkg/controller/garbagecollector/uid_cache.go
Normal file
52
pkg/controller/garbagecollector/uid_cache.go
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package garbagecollector
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/golang/groupcache/lru"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// UIDCache is an LRU cache for uid.
|
||||||
|
type UIDCache struct {
|
||||||
|
mutex sync.Mutex
|
||||||
|
cache *lru.Cache
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUIDCache returns a UIDCache.
|
||||||
|
func NewUIDCache(maxCacheEntries int) *UIDCache {
|
||||||
|
return &UIDCache{
|
||||||
|
cache: lru.New(maxCacheEntries),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds a uid to the cache.
|
||||||
|
func (c *UIDCache) Add(uid types.UID) {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
c.cache.Add(uid, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Has returns if a uid is in the cache.
|
||||||
|
func (c *UIDCache) Has(uid types.UID) bool {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
_, found := c.cache.Get(uid)
|
||||||
|
return found
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user