add finalizer logics to the API server and the garbage collector; handling DeleteOptions.OrphanDependents in the API server

This commit is contained in:
Chao Xu
2016-05-17 20:24:42 -07:00
parent 5303794ef0
commit 1665546d2d
20 changed files with 1038 additions and 124 deletions

View File

@@ -18,6 +18,7 @@ package garbagecollector
import (
"fmt"
"sync"
"time"
"github.com/golang/glog"
@@ -34,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
@@ -63,13 +65,27 @@ func (s objectReference) String() string {
// GarbageCollector.processItem() reads the nodes, but it only reads the fields
// that never get changed by Propagator.processEvent().
type node struct {
identity objectReference
dependents map[*node]struct{}
identity objectReference
// dependents will be read by the orphan() routine, we need to protect it with a lock.
dependentsLock *sync.RWMutex
dependents map[*node]struct{}
// When processing an Update event, we need to compare the updated
// ownerReferences with the owners recorded in the graph.
owners []metatypes.OwnerReference
}
func (ownerNode *node) addDependent(dependent *node) {
ownerNode.dependentsLock.Lock()
defer ownerNode.dependentsLock.Unlock()
ownerNode.dependents[dependent] = struct{}{}
}
func (ownerNode *node) deleteDependent(dependent *node) {
ownerNode.dependentsLock.Lock()
defer ownerNode.dependentsLock.Unlock()
delete(ownerNode.dependents, dependent)
}
type eventType int
const (
@@ -85,11 +101,35 @@ type event struct {
oldObj interface{}
}
type concurrentUIDToNode struct {
*sync.RWMutex
uidToNode map[types.UID]*node
}
func (m *concurrentUIDToNode) Write(node *node) {
m.Lock()
defer m.Unlock()
m.uidToNode[node.identity.UID] = node
}
func (m *concurrentUIDToNode) Read(uid types.UID) (*node, bool) {
m.RLock()
defer m.RUnlock()
n, ok := m.uidToNode[uid]
return n, ok
}
func (m *concurrentUIDToNode) Delete(uid types.UID) {
m.Lock()
defer m.Unlock()
delete(m.uidToNode, uid)
}
type Propagator struct {
eventQueue *workqueue.Type
// uidToNode doesn't require a lock to protect, because only the
// single-threaded Propagator.processEvent() reads/writes it.
uidToNode map[types.UID]*node
uidToNode *concurrentUIDToNode
gc *GarbageCollector
}
@@ -99,7 +139,7 @@ type Propagator struct {
// processItem() will verify if the owner exists according to the API server.
func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := p.uidToNode[owner.UID]
ownerNode, ok := p.uidToNode.Read(owner.UID)
if !ok {
// Create a "virtual" node in the graph for the owner if it doesn't
// exist in the graph yet. Then enqueue the virtual node into the
@@ -111,37 +151,38 @@ func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerRefer
OwnerReference: owner,
Namespace: n.identity.Namespace,
},
dependents: make(map[*node]struct{}),
dependentsLock: &sync.RWMutex{},
dependents: make(map[*node]struct{}),
}
p.uidToNode[ownerNode.identity.UID] = ownerNode
p.uidToNode.Write(ownerNode)
p.gc.dirtyQueue.Add(ownerNode)
}
ownerNode.dependents[n] = struct{}{}
ownerNode.addDependent(n)
}
}
// insertNode insert the node to p.uidToNode; then it finds all owners as listed
// in n.owners, and adds the node to their dependents list.
func (p *Propagator) insertNode(n *node) {
p.uidToNode[n.identity.UID] = n
p.uidToNode.Write(n)
p.addDependentToOwners(n, n.owners)
}
// removeDependentFromOwners remove n from owners' dependents list.
func (p *Propagator) removeDependentFromOwners(n *node, owners []metatypes.OwnerReference) {
for _, owner := range owners {
ownerNode, ok := p.uidToNode[owner.UID]
ownerNode, ok := p.uidToNode.Read(owner.UID)
if !ok {
continue
}
delete(ownerNode.dependents, n)
ownerNode.deleteDependent(n)
}
}
// removeNode removes the node from p.uidToNode, then finds all
// owners as listed in n.owners, and removes n from their dependents list.
func (p *Propagator) removeNode(n *node) {
delete(p.uidToNode, n.identity.UID)
p.uidToNode.Delete(n.identity.UID)
p.removeDependentFromOwners(n, n.owners)
}
@@ -171,6 +212,133 @@ func referencesDiffs(old []metatypes.OwnerReference, new []metatypes.OwnerRefere
return added, removed
}
func shouldOrphanDependents(e event, accessor meta.Object) bool {
// The delta_fifo may combine the creation and update of the object into one
// event, so we need to check AddEvent as well.
if e.oldObj == nil {
if accessor.GetDeletionTimestamp() == nil {
return false
}
} else {
oldAccessor, err := meta.Accessor(e.oldObj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
return false
}
// ignore the event if it's not updating DeletionTimestamp from non-nil to nil.
if accessor.GetDeletionTimestamp() == nil || oldAccessor.GetDeletionTimestamp() != nil {
return false
}
}
finalizers := accessor.GetFinalizers()
for _, finalizer := range finalizers {
if finalizer == api.FinalizerOrphan {
return true
}
}
return false
}
// dependents are copies of pointers to the owner's dependents, they don't need to be locked.
func (gc *GarbageCollector) orhpanDependents(owner objectReference, dependents []*node) error {
var failedDependents []objectReference
var errorsSlice []error
for _, dependent := range dependents {
// the dependent.identity.UID is used as precondition
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, owner.UID, dependent.identity.UID)
_, err := gc.patchObject(dependent.identity, []byte(deleteOwnerRefPatch))
// note that if the target ownerReference doesn't exist in the
// dependent, strategic merge patch will NOT return an error.
if err != nil && !errors.IsNotFound(err) {
errorsSlice = append(errorsSlice, fmt.Errorf("orphaning %s failed with %v", dependent.identity, err))
}
}
if len(failedDependents) != 0 {
return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error())
}
fmt.Println("CHAO: successfully updated all dependents")
return nil
}
// TODO: Using Patch when strategicmerge supports deleting an entry from a
// slice of a base type.
func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error {
const retries = 5
for count := 0; count < retries; count++ {
ownerObject, err := gc.getObject(owner.identity)
if err != nil {
return fmt.Errorf("cannot finalize owner %s, because cannot get it. The garbage collector will retry later.", owner.identity)
}
accessor, err := meta.Accessor(ownerObject)
if err != nil {
return fmt.Errorf("cannot access the owner object: %v. The garbage collector will retry later.", err)
}
finalizers := accessor.GetFinalizers()
var newFinalizers []string
found := false
for _, f := range finalizers {
if f == api.FinalizerOrphan {
found = true
} else {
newFinalizers = append(newFinalizers, f)
}
}
if !found {
glog.V(6).Infof("the orphan finalizer is already removed from object %s", owner.identity)
return nil
}
// remove the owner from dependent's OwnerReferences
ownerObject.SetFinalizers(newFinalizers)
_, err = gc.updateObject(owner.identity, ownerObject)
if err == nil {
return nil
}
if err != nil && !errors.IsConflict(err) {
return fmt.Errorf("cannot update the finalizers of owner %s, with error: %v, tried %d times", owner.identity, err, count+1)
}
// retry if it's a conflict
glog.V(6).Infof("got conflict updating the owner object %s, tried %d times", owner.identity, count+1)
}
return fmt.Errorf("updateMaxRetries(%d) has reached. The garbage collector will retry later for owner %v.", retries, owner.identity)
}
// orphanFinalizer dequeues a node from the orphanQueue, then finds its dependents
// based on the graph maintained by the GC, then removes it from the
// OwnerReferences of its dependents, and finally updates the owner to remove
// the "Orphan" finalizer. The node is add back into the orphanQueue if any of
// these steps fail.
func (gc *GarbageCollector) orphanFinalizer() {
key, quit := gc.orphanQueue.Get()
if quit {
return
}
defer gc.orphanQueue.Done(key)
owner, ok := key.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", key))
}
// we don't need to lock each element, because they never get updated
owner.dependentsLock.RLock()
dependents := make([]*node, 0, len(owner.dependents))
for dependent := range owner.dependents {
dependents = append(dependents, dependent)
}
owner.dependentsLock.RUnlock()
err := gc.orhpanDependents(owner.identity, dependents)
if err != nil {
glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(owner)
return
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeOrphanFinalizer(owner)
if err != nil {
glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err)
gc.orphanQueue.Add(owner)
}
}
// Dequeueing an event from eventQueue, updating graph, populating dirty_queue.
func (p *Propagator) processEvent() {
key, quit := p.eventQueue.Get()
@@ -196,7 +364,7 @@ func (p *Propagator) processEvent() {
}
glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType)
// Check if the node already exsits
existingNode, found := p.uidToNode[accessor.GetUID()]
existingNode, found := p.uidToNode.Read(accessor.GetUID())
switch {
case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
newNode := &node{
@@ -209,13 +377,24 @@ func (p *Propagator) processEvent() {
},
Namespace: accessor.GetNamespace(),
},
dependents: make(map[*node]struct{}),
owners: accessor.GetOwnerReferences(),
dependentsLock: &sync.RWMutex{},
dependents: make(map[*node]struct{}),
owners: accessor.GetOwnerReferences(),
}
p.insertNode(newNode)
// the underlying delta_fifo may combine a creation and deletion into one event
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", newNode.identity)
p.gc.orphanQueue.Add(newNode)
}
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// TODO: finalizer: Check if ObjectMeta.DeletionTimestamp is updated from nil to non-nil
// We only need to add/remove owner refs for now
// caveat: if GC observes the creation of the dependents later than the
// deletion of the owner, then the orphaning finalizer won't be effective.
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity)
p.gc.orphanQueue.Add(existingNode)
}
// add/remove owner refs
added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
if len(added) == 0 && len(removed) == 0 {
glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event)
@@ -234,6 +413,8 @@ func (p *Propagator) processEvent() {
return
}
p.removeNode(existingNode)
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
for dep := range existingNode.dependents {
p.gc.dirtyQueue.Add(dep)
}
@@ -244,11 +425,12 @@ func (p *Propagator) processEvent() {
// removing ownerReferences from the dependents if the owner is deleted with
// DeleteOptions.OrphanDependents=true.
type GarbageCollector struct {
restMapper meta.RESTMapper
clientPool dynamic.ClientPool
dirtyQueue *workqueue.Type
monitors []monitor
propagator *Propagator
restMapper meta.RESTMapper
clientPool dynamic.ClientPool
dirtyQueue *workqueue.Type
orphanQueue *workqueue.Type
monitors []monitor
propagator *Propagator
}
func monitorFor(p *Propagator, clientPool dynamic.ClientPool, resource unversioned.GroupVersionResource) (monitor, error) {
@@ -320,15 +502,19 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
func NewGarbageCollector(clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
gc := &GarbageCollector{
clientPool: clientPool,
dirtyQueue: workqueue.New(),
clientPool: clientPool,
dirtyQueue: workqueue.New(),
orphanQueue: workqueue.New(),
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper: registered.RESTMapper(),
}
gc.propagator = &Propagator{
eventQueue: workqueue.New(),
uidToNode: make(map[types.UID]*node),
gc: gc,
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
},
gc: gc,
}
for _, resource := range resources {
if _, ok := ignoredResources[resource]; ok {
@@ -396,6 +582,26 @@ func (gc *GarbageCollector) getObject(item objectReference) (*runtime.Unstructur
return client.Resource(resource, item.Namespace).Get(item.Name)
}
func (gc *GarbageCollector) updateObject(item objectReference, obj *runtime.Unstructured) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Update(obj)
}
func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*runtime.Unstructured, error) {
fqKind := unversioned.FromAPIVersionAndKind(item.APIVersion, item.Kind)
client, err := gc.clientPool.ClientForGroupVersion(fqKind.GroupVersion())
resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0)
if err != nil {
return nil, err
}
return client.Resource(resource, item.Namespace).Patch(item.Name, api.StrategicMergePatchType, patch)
}
func objectReferenceToUnstructured(ref objectReference) *runtime.Unstructured {
ret := &runtime.Unstructured{}
ret.SetKind(ref.Kind)
@@ -479,17 +685,19 @@ func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(gc.worker, 0, stopCh)
go wait.Until(gc.orphanFinalizer, 0, stopCh)
}
<-stopCh
glog.Infof("Shutting down garbage collector")
gc.dirtyQueue.ShutDown()
gc.orphanQueue.ShutDown()
gc.propagator.eventQueue.ShutDown()
}
// QueueDrained returns if the dirtyQueue and eventQueue are drained. It's
// useful for debugging.
// useful for debugging. Note that it doesn't guarantee the workers are idle.
func (gc *GarbageCollector) QueuesDrained() bool {
return gc.dirtyQueue.Len() == 0 && gc.propagator.eventQueue.Len() == 0
return gc.dirtyQueue.Len() == 0 && gc.propagator.eventQueue.Len() == 0 && gc.orphanQueue.Len() == 0
}
// *FOR TEST USE ONLY* It's not safe to call this function when the GC is still
@@ -498,7 +706,7 @@ func (gc *GarbageCollector) QueuesDrained() bool {
// uidToNode graph. It's useful for debugging.
func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool {
for _, u := range UIDs {
if _, ok := gc.propagator.uidToNode[u]; ok {
if _, ok := gc.propagator.uidToNode.Read(u); ok {
return true
}
}

View File

@@ -273,7 +273,10 @@ func TestProcessEvent(t *testing.T) {
for _, scenario := range testScenarios {
propagator := &Propagator{
eventQueue: workqueue.New(),
uidToNode: make(map[types.UID]*node),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
},
gc: &GarbageCollector{
dirtyQueue: workqueue.New(),
},
@@ -281,7 +284,36 @@ func TestProcessEvent(t *testing.T) {
for i := 0; i < len(scenario.events); i++ {
propagator.eventQueue.Add(scenario.events[i])
propagator.processEvent()
verifyGraphInvariants(scenario.name, propagator.uidToNode, t)
verifyGraphInvariants(scenario.name, propagator.uidToNode.uidToNode, t)
}
}
}
// TestDependentsRace relies on golang's data race detector to check if there is
// data race among in the dependents field.
func TestDependentsRace(t *testing.T) {
clientPool := dynamic.NewClientPool(&restclient.Config{}, dynamic.LegacyAPIPathResolverFunc)
podResource := []unversioned.GroupVersionResource{{Version: "v1", Resource: "pods"}}
gc, err := NewGarbageCollector(clientPool, podResource)
if err != nil {
t.Fatal(err)
}
const updates = 100
owner := &node{dependentsLock: &sync.RWMutex{}, dependents: make(map[*node]struct{})}
ownerUID := types.UID("owner")
gc.propagator.uidToNode.Write(owner)
go func() {
for i := 0; i < updates; i++ {
dependent := &node{}
gc.propagator.addDependentToOwners(dependent, []metatypes.OwnerReference{{UID: ownerUID}})
gc.propagator.removeDependentFromOwners(dependent, []metatypes.OwnerReference{{UID: ownerUID}})
}
}()
go func() {
gc.orphanQueue.Add(owner)
for i := 0; i < updates; i++ {
gc.orphanFinalizer()
}
}()
}