gc changes

This commit is contained in:
Chao Xu 2017-02-23 11:16:13 -08:00
parent 93686da104
commit c3baf402f5
10 changed files with 1324 additions and 751 deletions

View File

@ -191,7 +191,7 @@ func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
// Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true.
addControllerPatch := fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`,
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.controller.GetName(), m.controller.GetUID(), pod.UID)
return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch))

View File

@ -483,6 +483,9 @@ func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *
if controllerRef.Controller == nil || *controllerRef.Controller != true {
return fmt.Errorf("controllerRef.Controller is not set")
}
if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true {
return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set")
}
return r.createPods("", namespace, template, controllerObject, controllerRef)
}

File diff suppressed because it is too large Load Diff

View File

@ -19,6 +19,7 @@ package garbagecollector
import (
"net/http"
"net/http/httptest"
"reflect"
"strings"
"sync"
"testing"
@ -27,15 +28,16 @@ import (
_ "k8s.io/kubernetes/pkg/api/install"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/util/clock"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
@ -53,7 +55,7 @@ func TestNewGarbageCollector(t *testing.T) {
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 1, len(gc.monitors))
assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors))
}
// fakeAction records information about requests to aid in testing.
@ -142,8 +144,8 @@ func serilizeOrDie(t *testing.T, object interface{}) []byte {
return data
}
// test the processItem function making the expected actions.
func TestProcessItem(t *testing.T) {
// test the attemptToDeleteItem function making the expected actions.
func TestAttemptToDeleteItem(t *testing.T) {
pod := getPod("ToBeDeletedPod", []metav1.OwnerReference{
{
Kind: "ReplicationController",
@ -177,10 +179,10 @@ func TestProcessItem(t *testing.T) {
},
Namespace: pod.Namespace,
},
// owners are intentionally left empty. The processItem routine should get the latest item from the server.
// owners are intentionally left empty. The attemptToDeleteItem routine should get the latest item from the server.
owners: nil,
}
err := gc.processItem(item)
err := gc.attemptToDeleteItem(item)
if err != nil {
t.Errorf("Unexpected Error: %v", err)
}
@ -249,7 +251,7 @@ func TestProcessEvent(t *testing.T) {
var testScenarios = []struct {
name string
// a series of events that will be supplied to the
// Propagator.eventQueue.
// GraphBuilder.eventQueue.
events []event
}{
{
@ -293,22 +295,19 @@ func TestProcessEvent(t *testing.T) {
}
for _, scenario := range testScenarios {
propagator := &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(),
dependencyGraphBuilder := &GraphBuilder{
graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
},
gc: &GarbageCollector{
dirtyQueue: workqueue.NewTimedWorkQueue(),
clock: clock.RealClock{},
absentOwnerCache: NewUIDCache(2),
uidToNodeLock: sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
},
attemptToDelete: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
absentOwnerCache: NewUIDCache(2),
}
for i := 0; i < len(scenario.events); i++ {
propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: propagator.gc.clock.Now(), Object: &scenario.events[i]})
propagator.processEvent()
verifyGraphInvariants(scenario.name, propagator.uidToNode.uidToNode, t)
dependencyGraphBuilder.graphChanges.Add(&scenario.events[i])
dependencyGraphBuilder.processGraphChanges()
verifyGraphInvariants(scenario.name, dependencyGraphBuilder.uidToNode.uidToNode, t)
}
}
}
@ -321,18 +320,18 @@ func TestDependentsRace(t *testing.T) {
const updates = 100
owner := &node{dependents: make(map[*node]struct{})}
ownerUID := types.UID("owner")
gc.propagator.uidToNode.Write(owner)
gc.dependencyGraphBuilder.uidToNode.Write(owner)
go func() {
for i := 0; i < updates; i++ {
dependent := &node{}
gc.propagator.addDependentToOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}})
gc.propagator.removeDependentFromOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}})
gc.dependencyGraphBuilder.addDependentToOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}})
gc.dependencyGraphBuilder.removeDependentFromOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}})
}
}()
go func() {
gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: owner})
gc.attemptToOrphan.Add(owner)
for i := 0; i < updates; i++ {
gc.orphanFinalizer()
gc.attemptToOrphanWorker()
}
}()
}
@ -348,9 +347,13 @@ func TestGCListWatcher(t *testing.T) {
if err != nil {
t.Fatal(err)
}
lw := gcListWatcher(client, podResource)
lw.Watch(metav1.ListOptions{ResourceVersion: "1"})
lw.List(metav1.ListOptions{ResourceVersion: "1"})
lw := listWatcher(client, podResource)
if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil {
t.Fatal(err)
}
if _, err := lw.List(metav1.ListOptions{ResourceVersion: "1"}); err != nil {
t.Fatal(err)
}
if e, a := 2, len(testHandler.actions); e != a {
t.Errorf("expect %d requests, got %d", e, a)
}
@ -373,7 +376,7 @@ func podToGCNode(pod *v1.Pod) *node {
},
Namespace: pod.Namespace,
},
// owners are intentionally left empty. The processItem routine should get the latest item from the server.
// owners are intentionally left empty. The attemptToDeleteItem routine should get the latest item from the server.
owners: nil,
}
}
@ -447,12 +450,12 @@ func TestAbsentUIDCache(t *testing.T) {
defer srv.Close()
gc := setupGC(t, clientConfig)
gc.absentOwnerCache = NewUIDCache(2)
gc.processItem(podToGCNode(rc1Pod1))
gc.processItem(podToGCNode(rc2Pod1))
gc.attemptToDeleteItem(podToGCNode(rc1Pod1))
gc.attemptToDeleteItem(podToGCNode(rc2Pod1))
// rc1 should already be in the cache, no request should be sent. rc1 should be promoted in the UIDCache
gc.processItem(podToGCNode(rc1Pod2))
gc.attemptToDeleteItem(podToGCNode(rc1Pod2))
// after this call, rc2 should be evicted from the UIDCache
gc.processItem(podToGCNode(rc3Pod1))
gc.attemptToDeleteItem(podToGCNode(rc3Pod1))
// check cache
if !gc.absentOwnerCache.Has(types.UID("1")) {
t.Errorf("expected rc1 to be in the cache")
@ -474,3 +477,89 @@ func TestAbsentUIDCache(t *testing.T) {
t.Errorf("expected only 1 GET rc1 request, got %d", count)
}
}
func TestDeleteOwnerRefPatch(t *testing.T) {
original := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "100",
OwnerReferences: []metav1.OwnerReference{
{UID: "1"},
{UID: "2"},
{UID: "3"},
},
},
}
originalData := serilizeOrDie(t, original)
expected := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "100",
OwnerReferences: []metav1.OwnerReference{
{UID: "1"},
},
},
}
patch := deleteOwnerRefPatch("100", "2", "3")
patched, err := strategicpatch.StrategicMergePatch(originalData, patch, v1.Pod{})
if err != nil {
t.Fatal(err)
}
var got v1.Pod
if err := json.Unmarshal(patched, &got); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(expected, got) {
t.Errorf("expected: %#v,\ngot: %#v", expected, got)
}
}
func TestUnblockOwnerReference(t *testing.T) {
trueVar := true
falseVar := false
original := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "100",
OwnerReferences: []metav1.OwnerReference{
{UID: "1", BlockOwnerDeletion: &trueVar},
{UID: "2", BlockOwnerDeletion: &falseVar},
{UID: "3"},
},
},
}
originalData := serilizeOrDie(t, original)
expected := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "100",
OwnerReferences: []metav1.OwnerReference{
{UID: "1", BlockOwnerDeletion: &falseVar},
{UID: "2", BlockOwnerDeletion: &falseVar},
{UID: "3"},
},
},
}
accessor, err := meta.Accessor(&original)
if err != nil {
t.Fatal(err)
}
n := node{
owners: accessor.GetOwnerReferences(),
}
patch, err := n.patchToUnblockOwnerReferences()
if err != nil {
t.Fatal(err)
}
patched, err := strategicpatch.StrategicMergePatch(originalData, patch, v1.Pod{})
if err != nil {
t.Fatal(err)
}
var got v1.Pod
if err := json.Unmarshal(patched, &got); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(expected, got) {
t.Errorf("expected: %#v,\ngot: %#v", expected, got)
t.Errorf("expected: %#v,\ngot: %#v", expected.OwnerReferences, got.OwnerReferences)
for _, ref := range got.OwnerReferences {
t.Errorf("ref.UID=%s, ref.BlockOwnerDeletion=%v", ref.UID, *ref.BlockOwnerDeletion)
}
}
}

View File

@ -0,0 +1,159 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"fmt"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
type objectReference struct {
metav1.OwnerReference
// This is needed by the dynamic client
Namespace string
}
func (s objectReference) String() string {
return fmt.Sprintf("[%s/%s, namespace: %s, name: %s, uid: %s]", s.APIVersion, s.Kind, s.Namespace, s.Name, s.UID)
}
// The single-threaded GraphBuilder.processEvent() is the sole writer of the
// nodes. The multi-threaded GarbageCollector.processItem() reads the nodes.
// WARNING: node has different locks on different fields. setters and getters
// use the respective locks, so the return values of the getters can be
// inconsistent.
type node struct {
identity objectReference
// dependents will be read by the orphan() routine, we need to protect it with a lock.
dependentsLock sync.RWMutex
// dependents are the nodes that have node.identity as a
// metadata.ownerReference.
dependents map[*node]struct{}
// this is set by processEvent() if the object has non-nil DeletionTimestamp
// and has the FinalizerDeleteDependents.
deletingDependents bool
deletingDependentsLock sync.RWMutex
// this records if the object's deletionTimestamp is non-nil.
beingDeleted bool
beingDeletedLock sync.RWMutex
// when processing an Update event, we need to compare the updated
// ownerReferences with the owners recorded in the graph.
owners []metav1.OwnerReference
}
// An object is on a one way trip to its final deletion if it starts being
// deleted, so we only provide a function to set beingDeleted to true.
func (n *node) markBeingDeleted() {
n.beingDeletedLock.Lock()
defer n.beingDeletedLock.Unlock()
n.beingDeleted = true
}
func (n *node) isBeingDeleted() bool {
n.beingDeletedLock.RLock()
defer n.beingDeletedLock.RUnlock()
return n.beingDeleted
}
func (n *node) markDeletingDependents() {
n.deletingDependentsLock.Lock()
defer n.deletingDependentsLock.Unlock()
n.deletingDependents = true
}
func (n *node) isDeletingDependents() bool {
n.deletingDependentsLock.RLock()
defer n.deletingDependentsLock.RUnlock()
return n.deletingDependents
}
func (ownerNode *node) addDependent(dependent *node) {
ownerNode.dependentsLock.Lock()
defer ownerNode.dependentsLock.Unlock()
ownerNode.dependents[dependent] = struct{}{}
}
func (ownerNode *node) deleteDependent(dependent *node) {
ownerNode.dependentsLock.Lock()
defer ownerNode.dependentsLock.Unlock()
delete(ownerNode.dependents, dependent)
}
func (ownerNode *node) dependentsLength() int {
ownerNode.dependentsLock.RLock()
defer ownerNode.dependentsLock.RUnlock()
return len(ownerNode.dependents)
}
// Note that this function does not provide any synchronization guarantees;
// items could be added to or removed from ownerNode.dependents the moment this
// function returns.
func (ownerNode *node) getDependents() []*node {
ownerNode.dependentsLock.RLock()
defer ownerNode.dependentsLock.RUnlock()
var ret []*node
for dep := range ownerNode.dependents {
ret = append(ret, dep)
}
return ret
}
// blockingDependents returns the dependents that are blocking the deletion of
// n, i.e., the dependent that has an ownerReference pointing to n, and
// the BlockOwnerDeletion field of that ownerReference is true.
// Note that this function does not provide any synchronization guarantees;
// items could be added to or removed from ownerNode.dependents the moment this
// function returns.
func (n *node) blockingDependents() []*node {
dependents := n.getDependents()
var ret []*node
for _, dep := range dependents {
for _, owner := range dep.owners {
if owner.UID == n.identity.UID && owner.BlockOwnerDeletion != nil && *owner.BlockOwnerDeletion {
ret = append(ret, dep)
}
}
}
return ret
}
type concurrentUIDToNode struct {
uidToNodeLock sync.RWMutex
uidToNode map[types.UID]*node
}
func (m *concurrentUIDToNode) Write(node *node) {
m.uidToNodeLock.Lock()
defer m.uidToNodeLock.Unlock()
m.uidToNode[node.identity.UID] = node
}
func (m *concurrentUIDToNode) Read(uid types.UID) (*node, bool) {
m.uidToNodeLock.RLock()
defer m.uidToNodeLock.RUnlock()
n, ok := m.uidToNode[uid]
return n, ok
}
func (m *concurrentUIDToNode) Delete(uid types.UID) {
m.uidToNodeLock.Lock()
defer m.uidToNodeLock.Unlock()
delete(m.uidToNode, uid)
}

View File

@ -0,0 +1,497 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"fmt"
"reflect"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type eventType int
const (
addEvent eventType = iota
updateEvent
deleteEvent
)
type event struct {
eventType eventType
obj interface{}
// the update event comes with an old object, but it's not used by the garbage collector.
oldObj interface{}
}
// GraphBuilder: based on the events supplied by the informers, GraphBuilder updates
// uidToNode, a graph that caches the dependencies as we know, and enqueues
// items to the attemptToDelete and attemptToOrphan.
type GraphBuilder struct {
restMapper meta.RESTMapper
// each monitor list/watches a resource, the results are funneled to the
// dependencyGraphBuilder
monitors []cache.Controller
// metaOnlyClientPool uses a special codec, which removes fields except for
// apiVersion, kind, and metadata during decoding.
metaOnlyClientPool dynamic.ClientPool
// used to register exactly once the rate limiters of the clients used by
// the `monitors`.
registeredRateLimiterForControllers *RegisteredRateLimiter
// monitors are the producer of the graphChanges queue, graphBuilder alters
// the in-memory graph according to the changes.
graphChanges workqueue.RateLimitingInterface
// uidToNode doesn't require a lock to protect, because only the
// single-threaded GraphBuilder.processGraphChanges() reads/writes it.
uidToNode *concurrentUIDToNode
// GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
attemptToDelete workqueue.RateLimitingInterface
attemptToOrphan workqueue.RateLimitingInterface
// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
// be non-existent are added to the cached.
absentOwnerCache *UIDCache
}
func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *cache.ListWatch {
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
Resource(&apiResource, metav1.NamespaceAll).
List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
// APIResource.Kind is not used by the dynamic client, so
// leave it empty. We want to list this resource in all
// namespaces if it's namespace scoped, so leave
// APIResource.Namespaced as false is all right.
apiResource := metav1.APIResource{Name: resource.Resource}
return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback).
Resource(&apiResource, metav1.NamespaceAll).
Watch(options)
},
}
}
func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) {
// TODO: consider store in one storage.
glog.V(5).Infof("create storage for resource %s", resource)
client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind)
if err != nil {
return nil, err
}
gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring")
setObjectTypeMeta := func(obj interface{}) {
runtimeObject, ok := obj.(runtime.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj))
}
runtimeObject.GetObjectKind().SetGroupVersionKind(kind)
}
_, monitor := cache.NewInformer(
listWatcher(client, resource),
nil,
ResourceResyncTime,
cache.ResourceEventHandlerFuncs{
// add the event to the dependencyGraphBuilder's graphChanges.
AddFunc: func(obj interface{}) {
setObjectTypeMeta(obj)
event := &event{
eventType: addEvent,
obj: obj,
}
gb.graphChanges.Add(event)
},
UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj)
// TODO: check if there are differences in the ownerRefs,
// finalizers, and DeletionTimestamp; if not, ignore the update.
event := &event{updateEvent, newObj, oldObj}
gb.graphChanges.Add(event)
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
obj = deletedFinalStateUnknown.Obj
}
setObjectTypeMeta(obj)
event := &event{
eventType: deleteEvent,
obj: obj,
}
gb.graphChanges.Add(event)
},
},
)
return monitor, nil
}
func (gb *GraphBuilder) monitorsForResources(resources map[schema.GroupVersionResource]struct{}) error {
for resource := range resources {
if _, ok := ignoredResources[resource]; ok {
glog.V(5).Infof("ignore resource %#v", resource)
continue
}
kind, err := gb.restMapper.KindFor(resource)
if err != nil {
return err
}
monitor, err := gb.controllerFor(resource, kind)
if err != nil {
return err
}
gb.monitors = append(gb.monitors, monitor)
}
return nil
}
func (gb *GraphBuilder) HasSynced() bool {
for _, monitor := range gb.monitors {
if !monitor.HasSynced() {
return false
}
}
return true
}
func (gb *GraphBuilder) Run(stopCh <-chan struct{}) {
for _, monitor := range gb.monitors {
go monitor.Run(stopCh)
}
go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh)
}
var ignoredResources = map[schema.GroupVersionResource]struct{}{
schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicationcontrollers"}: {},
schema.GroupVersionResource{Group: "", Version: "v1", Resource: "bindings"}: {},
schema.GroupVersionResource{Group: "", Version: "v1", Resource: "componentstatuses"}: {},
schema.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}: {},
schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1beta1", Resource: "tokenreviews"}: {},
schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "subjectaccessreviews"}: {},
schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "selfsubjectaccessreviews"}: {},
schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {},
}
func (gb *GraphBuilder) enqueueChanges(e *event) {
gb.graphChanges.Add(e)
}
// addDependentToOwners adds n to owners' dependents list. If the owner does not
// exist in the gb.uidToNode yet, a "virtual" node will be created to represent
// the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
// processItem() will verify if the owner exists according to the API server.
func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
// Create a "virtual" node in the graph for the owner if it doesn't
// exist in the graph yet. Then enqueue the virtual node into the
// attemptToDelete. The garbage processor will enqueue a virtual delete
// event to delete it from the graph if API server confirms this
// owner doesn't exist.
ownerNode = &node{
identity: objectReference{
OwnerReference: owner,
Namespace: n.identity.Namespace,
},
dependents: make(map[*node]struct{}),
}
glog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
gb.uidToNode.Write(ownerNode)
gb.attemptToDelete.Add(ownerNode)
}
ownerNode.addDependent(n)
}
}
// insertNode insert the node to gb.uidToNode; then it finds all owners as listed
// in n.owners, and adds the node to their dependents list.
func (gb *GraphBuilder) insertNode(n *node) {
gb.uidToNode.Write(n)
gb.addDependentToOwners(n, n.owners)
}
// removeDependentFromOwners remove n from owners' dependents list.
func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := gb.uidToNode.Read(owner.UID)
if !ok {
continue
}
ownerNode.deleteDependent(n)
}
}
// removeNode removes the node from gb.uidToNode, then finds all
// owners as listed in n.owners, and removes n from their dependents list.
func (gb *GraphBuilder) removeNode(n *node) {
gb.uidToNode.Delete(n.identity.UID)
gb.removeDependentFromOwners(n, n.owners)
}
type ownerRefPair struct {
oldRef metav1.OwnerReference
newRef metav1.OwnerReference
}
// TODO: profile this function to see if a naive N^2 algorithm performs better
// when the number of references is small.
func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) {
oldUIDToRef := make(map[string]metav1.OwnerReference)
for i := 0; i < len(old); i++ {
oldUIDToRef[string(old[i].UID)] = old[i]
}
oldUIDSet := sets.StringKeySet(oldUIDToRef)
newUIDToRef := make(map[string]metav1.OwnerReference)
for i := 0; i < len(new); i++ {
newUIDToRef[string(new[i].UID)] = new[i]
}
newUIDSet := sets.StringKeySet(newUIDToRef)
addedUID := newUIDSet.Difference(oldUIDSet)
removedUID := oldUIDSet.Difference(newUIDSet)
intersection := oldUIDSet.Intersection(newUIDSet)
for uid := range addedUID {
added = append(added, newUIDToRef[uid])
}
for uid := range removedUID {
removed = append(removed, oldUIDToRef[uid])
}
for uid := range intersection {
if !reflect.DeepEqual(oldUIDToRef[uid], newUIDToRef[uid]) {
changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[uid], newRef: newUIDToRef[uid]})
}
}
return added, removed, changed
}
// returns if the object in the event just transitions to "being deleted".
func deletionStarts(oldObj interface{}, newAccessor metav1.Object) bool {
// The delta_fifo may combine the creation and update of the object into one
// event, so if there is no oldObj, we just return if the newObj (via
// newAccessor) is being deleted.
if oldObj == nil {
if newAccessor.GetDeletionTimestamp() == nil {
return false
}
return true
}
oldAccessor, err := meta.Accessor(oldObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
return false
}
return beingDeleted(newAccessor) && !beingDeleted(oldAccessor)
}
func beingDeleted(accessor metav1.Object) bool {
return accessor.GetDeletionTimestamp() != nil
}
func hasDeleteDependentsFinalizer(accessor metav1.Object) bool {
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == metav1.FinalizerDeleteDependents {
return true
}
}
return false
}
func hasOrphanFinalizer(accessor metav1.Object) bool {
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == metav1.FinalizerOrphanDependents {
return true
}
}
return false
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStarts(oldObj, newAccessor) && hasDeleteDependentsFinalizer(newAccessor)
}
// this function takes newAccessor directly because the caller already
// instantiates an accessor for the newObj.
func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
return deletionStarts(oldObj, newAccessor) && hasOrphanFinalizer(newAccessor)
}
// if an blocking ownerReference points to an object gets removed, or gets set to
// "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) {
for _, ref := range removed {
if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
node, found := gb.uidToNode.Read(ref.UID)
if !found {
glog.V(5).Infof("cannot find %s in uidToNode", ref.UID)
continue
}
gb.attemptToDelete.Add(node)
}
}
for _, c := range changed {
wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion
isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion)
if wasBlocked && isUnblocked {
node, found := gb.uidToNode.Read(c.newRef.UID)
if !found {
glog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID)
continue
}
gb.attemptToDelete.Add(node)
}
}
}
func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) {
if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
glog.V(5).Infof("add %s to the attemptToOrphan", n.identity)
gb.attemptToOrphan.Add(n)
return
}
if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
glog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity)
// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
n.markDeletingDependents()
for dep := range n.dependents {
gb.attemptToDelete.Add(dep)
}
gb.attemptToDelete.Add(n)
}
}
func (gb *GraphBuilder) runProcessGraphChanges() {
for gb.processGraphChanges() {
}
}
// Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
func (gb *GraphBuilder) processGraphChanges() bool {
item, quit := gb.graphChanges.Get()
if quit {
return false
}
defer gb.graphChanges.Done(item)
event, ok := item.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
return true
}
obj := event.obj
accessor, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true
}
typeAccessor, err := meta.TypeAccessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
return true
}
glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
// Check if the node already exsits
existingNode, found := gb.uidToNode.Read(accessor.GetUID())
switch {
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
newNode := &node{
identity: objectReference{
OwnerReference: metav1.OwnerReference{
APIVersion: typeAccessor.GetAPIVersion(),
Kind: typeAccessor.GetKind(),
UID: accessor.GetUID(),
Name: accessor.GetName(),
},
Namespace: accessor.GetNamespace(),
},
dependents: make(map[*node]struct{}),
owners: accessor.GetOwnerReferences(),
deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
beingDeleted: beingDeleted(accessor),
}
gb.insertNode(newNode)
// the underlying delta_fifo may combine a creation and a deletion into
// one event, so we need to further process the event.
gb.processTransitions(event.oldObj, accessor, newNode)
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// handle changes in ownerReferences
added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
// check if the changed dependency graph unblock owners that are
// waiting for the deletion of their dependents.
gb.addUnblockedOwnersToDeleteQueue(removed, changed)
// update the node itself
existingNode.owners = accessor.GetOwnerReferences()
// Add the node to its new owners' dependent lists.
gb.addDependentToOwners(existingNode, added)
// remove the node from the dependent list of node that are no longer in
// the node's owners list.
gb.removeDependentFromOwners(existingNode, removed)
}
if beingDeleted(accessor) {
existingNode.markBeingDeleted()
}
gb.processTransitions(event.oldObj, accessor, existingNode)
case event.eventType == deleteEvent:
if !found {
glog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID())
return true
}
// removeNode updates the graph
gb.removeNode(existingNode)
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
if len(existingNode.dependents) > 0 {
gb.absentOwnerCache.Add(accessor.GetUID())
}
for dep := range existingNode.dependents {
gb.attemptToDelete.Add(dep)
}
for _, owner := range existingNode.owners {
ownerNode, found := gb.uidToNode.Read(owner.UID)
if !found || !ownerNode.isDeletingDependents() {
continue
}
// this is to let attempToDeleteItem check if all the owner's
// dependents are deleted, if so, the owner will be deleted.
gb.attemptToDelete.Add(ownerNode)
}
}
return true
}

View File

@ -0,0 +1,135 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"fmt"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/client/retry"
)
// apiResource consults the REST mapper to translate an <apiVersion, kind,
// namespace> tuple to a unversioned.APIResource struct.
func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool) (*metav1.APIResource, error) {
fqKind := schema.FromAPIVersionAndKind(apiVersion, kind)
mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), apiVersion)
if err != nil {
return nil, fmt.Errorf("unable to get REST mapping for kind: %s, version: %s", kind, apiVersion)
}
glog.V(5).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource)
resource := metav1.APIResource{
Name: mapping.Resource,
Namespaced: namespaced,
Kind: kind,
}
return &resource, nil
}
func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.DeletionPropagation) error {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return err
}
uid := item.UID
preconditions := metav1.Preconditions{UID: &uid}
deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy}
return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions)
}
func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Get(item.Name)
}
func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Update(obj)
}
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) {
fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersionKind(fqKind)
gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation")
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Patch(item.Name, types.StrategicMergePatchType, patch)
}
// TODO: Using Patch when strategicmerge supports deleting an entry from a
// slice of a base type.
func (gc *GarbageCollector) removeFinalizer(owner *node, targetFinalizer string) error {
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ownerObject, err := gc.getObject(owner.identity)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return fmt.Errorf("cannot finalize owner %s, because cannot get it: %v. The garbage collector will retry later.", owner.identity, err)
}
accessor, err := meta.Accessor(ownerObject)
if err != nil {
return fmt.Errorf("cannot access the owner object %v: %v. The garbage collector will retry later.", ownerObject, err)
}
finalizers := accessor.GetFinalizers()
var newFinalizers []string
found := false
for _, f := range finalizers {
if f == targetFinalizer {
found = true
break
}
newFinalizers = append(newFinalizers, f)
}
if !found {
glog.V(5).Infof("the orphan finalizer is already removed from object %s", owner.identity)
return nil
}
// remove the owner from dependent's OwnerReferences
ownerObject.SetFinalizers(newFinalizers)
_, err = gc.updateObject(owner.identity, ownerObject)
return err
})
if errors.IsConflict(err) {
return fmt.Errorf("updateMaxRetries(%d) has reached. The garbage collector will retry later for owner %v.", retry.DefaultBackoff.Steps, owner.identity)
}
return err
}

View File

@ -0,0 +1,54 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package garbagecollector
import (
"encoding/json"
"fmt"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
)
func deleteOwnerRefPatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte {
var pieces []string
for _, ownerUID := range ownerUIDs {
pieces = append(pieces, fmt.Sprintf(`{"$patch":"delete","uid":"%s"}`, ownerUID))
}
patch := fmt.Sprintf(`{"metadata":{"ownerReferences":[%s],"uid":"%s"}}`, strings.Join(pieces, ","), dependentUID)
return []byte(patch)
}
// generate a patch that unsets the BlockOwnerDeletion field of all
// ownerReferences of node.
func (n *node) patchToUnblockOwnerReferences() ([]byte, error) {
var dummy metaonly.MetadataOnlyObject
var blockingRefs []metav1.OwnerReference
falseVar := false
for _, owner := range n.owners {
if owner.BlockOwnerDeletion != nil && *owner.BlockOwnerDeletion {
ref := owner
ref.BlockOwnerDeletion = &falseVar
blockingRefs = append(blockingRefs, ref)
}
}
dummy.ObjectMeta.SetOwnerReferences(blockingRefs)
dummy.ObjectMeta.UID = n.identity.UID
return json.Marshal(dummy)
}

View File

@ -481,14 +481,14 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte
go func() {
defer wg.Done()
var err error
var trueVar = true
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: getRSKind().GroupVersion().String(),
Kind: getRSKind().Kind,
Name: rs.Name,
UID: rs.UID,
Controller: &trueVar,
APIVersion: getRSKind().GroupVersion().String(),
Kind: getRSKind().Kind,
Name: rs.Name,
UID: rs.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
if err != nil {

View File

@ -479,13 +479,14 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl
go func() {
defer wg.Done()
var err error
var trueVar = true
boolPtr := func(b bool) *bool { return &b }
controllerRef := &metav1.OwnerReference{
APIVersion: getRCKind().GroupVersion().String(),
Kind: getRCKind().Kind,
Name: rc.Name,
UID: rc.UID,
Controller: &trueVar,
APIVersion: getRCKind().GroupVersion().String(),
Kind: getRCKind().Kind,
Name: rc.Name,
UID: rc.UID,
BlockOwnerDeletion: boolPtr(true),
Controller: boolPtr(true),
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
if err != nil {