mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-14 06:15:45 +00:00
Check expectations before filtering through active pods.
This commit is contained in:
parent
2f2816368f
commit
54b6501349
@ -58,6 +58,17 @@ var expKeyFunc = func(obj interface{}) (string, error) {
|
|||||||
return "", fmt.Errorf("Could not find key for obj %#v", obj)
|
return "", fmt.Errorf("Could not find key for obj %#v", obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RCExpectationsManager is an interface that allows users to set and wait on expectations.
|
||||||
|
// Only abstracted out for testing.
|
||||||
|
type RCExpectationsManager interface {
|
||||||
|
GetExpectations(rc *api.ReplicationController) (*PodExpectations, bool, error)
|
||||||
|
SatisfiedExpectations(rc *api.ReplicationController) bool
|
||||||
|
ExpectCreations(rc *api.ReplicationController, adds int) error
|
||||||
|
ExpectDeletions(rc *api.ReplicationController, dels int) error
|
||||||
|
CreationObserved(rc *api.ReplicationController)
|
||||||
|
DeletionObserved(rc *api.ReplicationController)
|
||||||
|
}
|
||||||
|
|
||||||
// RCExpectations is a ttl cache mapping rcs to what they expect to see before being woken up for a sync.
|
// RCExpectations is a ttl cache mapping rcs to what they expect to see before being woken up for a sync.
|
||||||
type RCExpectations struct {
|
type RCExpectations struct {
|
||||||
cache.Store
|
cache.Store
|
||||||
|
@ -81,7 +81,7 @@ type ReplicationManager struct {
|
|||||||
// To allow injection of syncReplicationController for testing.
|
// To allow injection of syncReplicationController for testing.
|
||||||
syncHandler func(rcKey string) error
|
syncHandler func(rcKey string) error
|
||||||
// A TTLCache of pod creates/deletes each rc expects to see
|
// A TTLCache of pod creates/deletes each rc expects to see
|
||||||
expectations *RCExpectations
|
expectations RCExpectationsManager
|
||||||
// A store of controllers, populated by the rcController
|
// A store of controllers, populated by the rcController
|
||||||
controllerStore cache.StoreToControllerLister
|
controllerStore cache.StoreToControllerLister
|
||||||
// A store of pods, populated by the podController
|
// A store of pods, populated by the podController
|
||||||
@ -351,16 +351,20 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
|||||||
}
|
}
|
||||||
controller := *obj.(*api.ReplicationController)
|
controller := *obj.(*api.ReplicationController)
|
||||||
|
|
||||||
|
// Check the expectations of the rc before counting active pods, otherwise a new pod can sneak in
|
||||||
|
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
|
||||||
|
// the store after we've checked the expectation, the rc sync is just deferred till the next relist.
|
||||||
|
rcNeedsSync := rm.expectations.SatisfiedExpectations(&controller)
|
||||||
podList, err := rm.podStore.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector())
|
podList, err := rm.podStore.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Error getting pods for rc %q: %v", key, err)
|
glog.Errorf("Error getting pods for rc %q: %v", key, err)
|
||||||
rm.queue.Add(key)
|
rm.queue.Add(key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Do this in a single pass, or use an index.
|
// TODO: Do this in a single pass, or use an index.
|
||||||
filteredPods := filterActivePods(podList.Items)
|
filteredPods := filterActivePods(podList.Items)
|
||||||
|
if rcNeedsSync {
|
||||||
if rm.expectations.SatisfiedExpectations(&controller) {
|
|
||||||
rm.manageReplicas(filteredPods, &controller)
|
rm.manageReplicas(filteredPods, &controller)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -953,3 +953,40 @@ func TestControllerBurstReplicas(t *testing.T) {
|
|||||||
doTestControllerBurstReplicas(t, 5, 12)
|
doTestControllerBurstReplicas(t, 5, 12)
|
||||||
doTestControllerBurstReplicas(t, 3, 2)
|
doTestControllerBurstReplicas(t, 3, 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type FakeRCExpectations struct {
|
||||||
|
*RCExpectations
|
||||||
|
satisfied bool
|
||||||
|
expSatisfied func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fe FakeRCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bool {
|
||||||
|
fe.expSatisfied()
|
||||||
|
return fe.satisfied
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRCSyncExpectations tests that a pod cannot sneak in between counting active pods
|
||||||
|
// and checking expectations.
|
||||||
|
func TestRCSyncExpectations(t *testing.T) {
|
||||||
|
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
|
||||||
|
fakePodControl := FakePodControl{}
|
||||||
|
manager := NewReplicationManager(client, 2)
|
||||||
|
manager.podControl = &fakePodControl
|
||||||
|
|
||||||
|
controllerSpec := newReplicationController(2)
|
||||||
|
manager.controllerStore.Store.Add(controllerSpec)
|
||||||
|
pods := newPodList(nil, 2, api.PodPending, controllerSpec)
|
||||||
|
manager.podStore.Store.Add(&pods.Items[0])
|
||||||
|
postExpectationsPod := pods.Items[1]
|
||||||
|
|
||||||
|
manager.expectations = FakeRCExpectations{
|
||||||
|
NewRCExpectations(), true, func() {
|
||||||
|
// If we check active pods before checking expectataions, the rc
|
||||||
|
// will create a new replica because it doesn't see this pod, but
|
||||||
|
// has fulfilled its expectations.
|
||||||
|
manager.podStore.Store.Add(&postExpectationsPod)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
manager.syncReplicationController(getKey(controllerSpec, t))
|
||||||
|
validateSyncReplication(t, &fakePodControl, 0, 0)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user