Merge pull request #8822 from bprashanth/fifo_rc

Wake up rcs when pods get DeletionFinalStateUnknown tombstones
This commit is contained in:
Wojciech Tyczynski 2015-05-28 10:50:28 +02:00
commit 6ffe46a9e0
6 changed files with 122 additions and 36 deletions

View File

@ -22,6 +22,8 @@ import (
"sync" "sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
) )
// NewDeltaFIFO returns a Store which can be used process changes to items. // NewDeltaFIFO returns a Store which can be used process changes to items.
@ -76,6 +78,13 @@ func NewDeltaFIFO(keyFunc KeyFunc, compressor DeltaCompressor, knownObjectKeys K
// threads, you could end up with multiple threads processing slightly // threads, you could end up with multiple threads processing slightly
// different versions of the same object. // different versions of the same object.
// //
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
// to list keys that are "known", for the puspose of figuring out which
// items have been deleted when Replace() is called. If the given KeyLister
// also satisfies the KeyGetter interface, the deleted objet will be
// included in the DeleteFinalStateUnknown markers. These objects
// could be stale.
//
// You may provide a function to compress deltas (e.g., represent a // You may provide a function to compress deltas (e.g., represent a
// series of Updates as a single Update). // series of Updates as a single Update).
type DeltaFIFO struct { type DeltaFIFO struct {
@ -334,7 +343,21 @@ func (f *DeltaFIFO) Replace(list []interface{}) error {
continue continue
} }
} }
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k}); err != nil { var deletedObj interface{}
if keyGetter, ok := f.knownObjectKeys.(KeyGetter); ok {
var exists bool
var err error
deletedObj, exists, err = keyGetter.GetByKey(k)
if err != nil || !exists {
deletedObj = nil
if err != nil {
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else {
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
}
}
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err return err
} }
} }
@ -346,12 +369,9 @@ type KeyLister interface {
ListKeys() []string ListKeys() []string
} }
// KeyListerFunc adapts a raw function to be a KeyLister. // A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyListerFunc func() []string type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
// ListKeys just calls kl.
func (kl KeyListerFunc) ListKeys() []string {
return kl()
} }
// DeltaCompressor is an algorithm that removes redundant changes. // DeltaCompressor is an algorithm that removes redundant changes.
@ -427,8 +447,10 @@ func copyDeltas(d Deltas) Deltas {
} }
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
// an object was deleted but the watch deletion event was was missed. // an object was deleted but the watch deletion event was missed. In this
// In this case we don't know the final "resting" state of the object. // case we don't know the final "resting" state of the object, so there's
// a chance the included `Obj` is stale.
type DeletedFinalStateUnknown struct { type DeletedFinalStateUnknown struct {
Key string Key string
Obj interface{}
} }

View File

@ -27,6 +27,24 @@ func testPop(f *DeltaFIFO) testFifoObject {
return f.Pop().(Deltas).Newest().Object.(testFifoObject) return f.Pop().(Deltas).Newest().Object.(testFifoObject)
} }
// keyLookupFunc adapts a raw function to be a KeyLookup.
type keyLookupFunc func() []string
// ListKeys just calls kl.
func (kl keyLookupFunc) ListKeys() []string {
return kl()
}
// GetByKey returns the key if it exists in the list returned by kl.
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
for _, v := range kl() {
if v == key {
return key, true, nil
}
}
return nil, false, nil
}
func TestDeltaFIFO_basic(t *testing.T) { func TestDeltaFIFO_basic(t *testing.T) {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
const amount = 500 const amount = 500
@ -174,7 +192,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
f := NewDeltaFIFO( f := NewDeltaFIFO(
testFifoObjectKeyFunc, testFifoObjectKeyFunc,
nil, nil,
KeyListerFunc(func() []string { keyLookupFunc(func() []string {
return []string{"foo", "bar", "baz"} return []string{"foo", "bar", "baz"}
}), }),
) )
@ -184,7 +202,9 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
expectedList := []Deltas{ expectedList := []Deltas{
{{Deleted, mkFifoObj("baz", 10)}}, {{Deleted, mkFifoObj("baz", 10)}},
{{Sync, mkFifoObj("foo", 5)}}, {{Sync, mkFifoObj("foo", 5)}},
{{Deleted, DeletedFinalStateUnknown{Key: "bar"}}}, // Since "bar" didn't have a delete event and wasn't in the Replace list
// it should get a tombstone key with the right Obj.
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: "bar"}}},
} }
for _, expected := range expectedList { for _, expected := range expectedList {
@ -259,9 +279,9 @@ func TestDeltaFIFO_KeyOf(t *testing.T) {
key string key string
}{ }{
{obj: testFifoObject{name: "A"}, key: "A"}, {obj: testFifoObject{name: "A"}, key: "A"},
{obj: DeletedFinalStateUnknown{Key: "B"}, key: "B"}, {obj: DeletedFinalStateUnknown{Key: "B", Obj: nil}, key: "B"},
{obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"}, {obj: Deltas{{Object: testFifoObject{name: "C"}}}, key: "C"},
{obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D"}}}, key: "D"}, {obj: Deltas{{Object: DeletedFinalStateUnknown{Key: "D", Obj: nil}}}, key: "D"},
} }
for _, item := range table { for _, item := range table {

View File

@ -366,6 +366,16 @@ func (s *serviceCache) ListKeys() []string {
return keys return keys
} }
// GetByKey returns the value stored in the serviceMap under the given key
func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
if v, ok := s.serviceMap[key]; ok {
return v, true, nil
}
return nil, false, nil
}
// ListKeys implements the interface required by DeltaFIFO to list the keys we // ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about. // already know about.
func (s *serviceCache) allServices() []*cachedService { func (s *serviceCache) allServices() []*cachedService {

View File

@ -237,26 +237,28 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
// When a pod is deleted, enqueue the controller that manages the pod and update its expectations. // When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
func (rm *ReplicationManager) deletePod(obj interface{}) { func (rm *ReplicationManager) deletePod(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok { pod, ok := obj.(*api.Pod)
if rc := rm.getPodControllers(pod); rc != nil {
rm.expectations.DeletionObserved(rc)
rm.enqueueController(rc)
}
return
}
// When a delete is dropped, the relist will notice a pod in the store not // When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone key. Since we don't // in the list, leading to the insertion of a tombstone object which contains
// know which rc to wake up/update expectations, we rely on the ttl on the // the deleted key/value. Note that this value might be stale. If the pod
// expectation expiring. The rc syncs via the 30s periodic resync and notices // changed labels the new rc will not be woken up till the periodic resync.
// fewer pods than its replica count. if !ok {
podKey, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if err != nil { if !ok {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err) glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, ExpectationsTimeout)
return return
}
pod, ok = tombstone.Obj.(*api.Pod)
if !ok {
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, ExpectationsTimeout)
return
}
}
if rc := rm.getPodControllers(pod); rc != nil {
rm.expectations.DeletionObserved(rc)
rm.enqueueController(rc)
} }
// A periodic relist might not have a pod that the store has, in such cases we are sent a tombstone key.
// We don't know which controllers to sync, so just let the controller relist handle this.
glog.Infof("Pod %q was deleted but we don't have a record of its final state so it could take up to %v before a controller recreates a replica.", podKey, ExpectationsTimeout)
} }
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item. // obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item.

View File

@ -252,6 +252,38 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
validateSyncReplication(t, &fakePodControl, 0, 1) validateSyncReplication(t, &fakePodControl, 0, 1)
} }
func TestDeleteFinalStateUnknown(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager.podControl = &fakePodControl
received := make(chan string)
manager.syncHandler = func(key string) error {
received <- key
return nil
}
// The DeletedFinalStateUnknown object should cause the rc manager to insert
// the controller matching the selectors of the deleted pod into the work queue.
controllerSpec := newReplicationController(1)
manager.controllerStore.Store.Add(controllerSpec)
pods := newPodList(nil, 1, api.PodRunning, controllerSpec)
manager.deletePod(cache.DeletedFinalStateUnknown{Key: "foo", Obj: &pods.Items[0]})
go manager.worker()
expected := getKey(controllerSpec, t)
select {
case key := <-received:
if key != expected {
t.Errorf("Unexpected sync all for rc %v, expected %v", key, expected)
}
case <-time.After(100 * time.Millisecond):
t.Errorf("Processing DeleteFinalStateUnknown took longer than expected")
}
}
func TestSyncReplicationControllerCreates(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
manager := NewReplicationManager(client, BurstReplicas) manager := NewReplicationManager(client, BurstReplicas)

View File

@ -638,7 +638,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
current := 0 current := 0
same := 0 same := 0
By(fmt.Sprintf("Creating replication controller %s", name)) By(fmt.Sprintf("%v Creating replication controller %s", time.Now(), name))
rc := &api.ReplicationController{ rc := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: name, Name: name,
@ -668,7 +668,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
if err != nil { if err != nil {
return fmt.Errorf("Error creating replication controller: %v", err) return fmt.Errorf("Error creating replication controller: %v", err)
} }
Logf("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, ns, rc.Spec.Replicas) Logf("%v Created replication controller with name: %v, namespace: %v, replica count: %v", time.Now(), rc.Name, ns, rc.Spec.Replicas)
By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns)) By(fmt.Sprintf("Making sure all %d replicas of rc %s in namespace %s exist", replicas, name, ns))
label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name})) label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
@ -679,7 +679,7 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
current = len(pods.Items) current = len(pods.Items)
failCount := 5 failCount := 5
for same < failCount && current < replicas { for same < failCount && current < replicas {
Logf("Controller %s: Found %d pods out of %d", name, current, replicas) Logf("%v Controller %s: Found %d pods out of %d", time.Now(), name, current, replicas)
if last < current { if last < current {
same = 0 same = 0
} else if last == current { } else if last == current {
@ -703,9 +703,9 @@ func RunRC(c *client.Client, name string, ns, image string, replicas int) error
if current != replicas { if current != replicas {
return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas) return fmt.Errorf("Controller %s: Only found %d replicas out of %d", name, current, replicas)
} }
Logf("Controller %s in ns %s: Found %d pods out of %d", name, ns, current, replicas) Logf("%v Controller %s in ns %s: Found %d pods out of %d", time.Now(), name, ns, current, replicas)
By(fmt.Sprintf("Waiting for all %d replicas to be running with a max container failures of %d", replicas, maxContainerFailures)) By(fmt.Sprintf("%v Waiting for all %d replicas to be running with a max container failures of %d", time.Now(), replicas, maxContainerFailures))
same = 0 same = 0
last = 0 last = 0
failCount = 10 failCount = 10