mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Merge pull request #19433 from janetkuo/deployment-e2e-flake-fix
Fix flaky e2e: Use expectation model for deployment's new rc creation
This commit is contained in:
commit
1acfb151b6
@ -76,16 +76,16 @@ func StaticResyncPeriodFunc(resyncPeriod time.Duration) ResyncPeriodFunc {
|
|||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// Implementation:
|
// Implementation:
|
||||||
// PodExpectation = pair of atomic counters to track pod creation/deletion
|
// ControlleeExpectation = pair of atomic counters to track controllee's creation/deletion
|
||||||
// ControllerExpectationsStore = TTLStore + a PodExpectation per controller
|
// ControllerExpectationsStore = TTLStore + a ControlleeExpectation per controller
|
||||||
//
|
//
|
||||||
// * Once set expectations can only be lowered
|
// * Once set expectations can only be lowered
|
||||||
// * A controller isn't synced till its expectations are either fulfilled, or expire
|
// * A controller isn't synced till its expectations are either fulfilled, or expire
|
||||||
// * Controllers that don't set expectations will get woken up for every matching pod
|
// * Controllers that don't set expectations will get woken up for every matching controllee
|
||||||
|
|
||||||
// ExpKeyFunc to parse out the key from a PodExpectation
|
// ExpKeyFunc to parse out the key from a ControlleeExpectation
|
||||||
var ExpKeyFunc = func(obj interface{}) (string, error) {
|
var ExpKeyFunc = func(obj interface{}) (string, error) {
|
||||||
if e, ok := obj.(*PodExpectations); ok {
|
if e, ok := obj.(*ControlleeExpectations); ok {
|
||||||
return e.key, nil
|
return e.key, nil
|
||||||
}
|
}
|
||||||
return "", fmt.Errorf("Could not find key for obj %#v", obj)
|
return "", fmt.Errorf("Could not find key for obj %#v", obj)
|
||||||
@ -96,7 +96,7 @@ var ExpKeyFunc = func(obj interface{}) (string, error) {
|
|||||||
// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
|
// Warning: if using KeyFunc it is not safe to use a single ControllerExpectationsInterface with different
|
||||||
// types of controllers, because the keys might conflict across types.
|
// types of controllers, because the keys might conflict across types.
|
||||||
type ControllerExpectationsInterface interface {
|
type ControllerExpectationsInterface interface {
|
||||||
GetExpectations(controllerKey string) (*PodExpectations, bool, error)
|
GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error)
|
||||||
SatisfiedExpectations(controllerKey string) bool
|
SatisfiedExpectations(controllerKey string) bool
|
||||||
DeleteExpectations(controllerKey string)
|
DeleteExpectations(controllerKey string)
|
||||||
SetExpectations(controllerKey string, add, del int) error
|
SetExpectations(controllerKey string, add, del int) error
|
||||||
@ -111,10 +111,10 @@ type ControllerExpectations struct {
|
|||||||
cache.Store
|
cache.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetExpectations returns the PodExpectations of the given controller.
|
// GetExpectations returns the ControlleeExpectations of the given controller.
|
||||||
func (r *ControllerExpectations) GetExpectations(controllerKey string) (*PodExpectations, bool, error) {
|
func (r *ControllerExpectations) GetExpectations(controllerKey string) (*ControlleeExpectations, bool, error) {
|
||||||
if podExp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
|
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
|
||||||
return podExp.(*PodExpectations), true, nil
|
return exp.(*ControlleeExpectations), true, nil
|
||||||
} else {
|
} else {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
@ -122,22 +122,22 @@ func (r *ControllerExpectations) GetExpectations(controllerKey string) (*PodExpe
|
|||||||
|
|
||||||
// DeleteExpectations deletes the expectations of the given controller from the TTLStore.
|
// DeleteExpectations deletes the expectations of the given controller from the TTLStore.
|
||||||
func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
|
func (r *ControllerExpectations) DeleteExpectations(controllerKey string) {
|
||||||
if podExp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
|
if exp, exists, err := r.GetByKey(controllerKey); err == nil && exists {
|
||||||
if err := r.Delete(podExp); err != nil {
|
if err := r.Delete(exp); err != nil {
|
||||||
glog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
|
glog.V(2).Infof("Error deleting expectations for controller %v: %v", controllerKey, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
|
// SatisfiedExpectations returns true if the required adds/dels for the given controller have been observed.
|
||||||
// Add/del counts are established by the controller at sync time, and updated as pods are observed by the controller
|
// Add/del counts are established by the controller at sync time, and updated as controllees are observed by the controller
|
||||||
// manager.
|
// manager.
|
||||||
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
|
func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) bool {
|
||||||
if podExp, exists, err := r.GetExpectations(controllerKey); exists {
|
if exp, exists, err := r.GetExpectations(controllerKey); exists {
|
||||||
if podExp.Fulfilled() {
|
if exp.Fulfilled() {
|
||||||
return true
|
return true
|
||||||
} else {
|
} else {
|
||||||
glog.V(4).Infof("Controller still waiting on expectations %#v", podExp)
|
glog.V(4).Infof("Controller still waiting on expectations %#v", exp)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -145,9 +145,9 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo
|
|||||||
} else {
|
} else {
|
||||||
// When a new controller is created, it doesn't have expectations.
|
// When a new controller is created, it doesn't have expectations.
|
||||||
// When it doesn't see expected watch events for > TTL, the expectations expire.
|
// When it doesn't see expected watch events for > TTL, the expectations expire.
|
||||||
// - In this case it wakes up, creates/deletes pods, and sets expectations again.
|
// - In this case it wakes up, creates/deletes controllees, and sets expectations again.
|
||||||
// When it has satisfied expectations and no pods need to be created/destroyed > TTL, the expectations expire.
|
// When it has satisfied expectations and no controllees need to be created/destroyed > TTL, the expectations expire.
|
||||||
// - In this case it continues without setting expectations till it needs to create/delete pods.
|
// - In this case it continues without setting expectations till it needs to create/delete controllees.
|
||||||
glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
|
glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", controllerKey)
|
||||||
}
|
}
|
||||||
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
|
// Trigger a sync if we either encountered and error (which shouldn't happen since we're
|
||||||
@ -157,9 +157,9 @@ func (r *ControllerExpectations) SatisfiedExpectations(controllerKey string) boo
|
|||||||
|
|
||||||
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
|
// SetExpectations registers new expectations for the given controller. Forgets existing expectations.
|
||||||
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
|
func (r *ControllerExpectations) SetExpectations(controllerKey string, add, del int) error {
|
||||||
podExp := &PodExpectations{add: int64(add), del: int64(del), key: controllerKey}
|
exp := &ControlleeExpectations{add: int64(add), del: int64(del), key: controllerKey}
|
||||||
glog.V(4).Infof("Setting expectations %+v", podExp)
|
glog.V(4).Infof("Setting expectations %+v", exp)
|
||||||
return r.Add(podExp)
|
return r.Add(exp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
|
func (r *ControllerExpectations) ExpectCreations(controllerKey string, adds int) error {
|
||||||
@ -172,10 +172,10 @@ func (r *ControllerExpectations) ExpectDeletions(controllerKey string, dels int)
|
|||||||
|
|
||||||
// Decrements the expectation counts of the given controller.
|
// Decrements the expectation counts of the given controller.
|
||||||
func (r *ControllerExpectations) lowerExpectations(controllerKey string, add, del int) {
|
func (r *ControllerExpectations) lowerExpectations(controllerKey string, add, del int) {
|
||||||
if podExp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
|
if exp, exists, err := r.GetExpectations(controllerKey); err == nil && exists {
|
||||||
podExp.Seen(int64(add), int64(del))
|
exp.Seen(int64(add), int64(del))
|
||||||
// The expectations might've been modified since the update on the previous line.
|
// The expectations might've been modified since the update on the previous line.
|
||||||
glog.V(4).Infof("Lowering expectations %+v", podExp)
|
glog.V(4).Infof("Lowering expectations %+v", exp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,31 +194,31 @@ type Expectations interface {
|
|||||||
Fulfilled() bool
|
Fulfilled() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// PodExpectations track pod creates/deletes.
|
// ControlleeExpectations track controllee creates/deletes.
|
||||||
type PodExpectations struct {
|
type ControlleeExpectations struct {
|
||||||
add int64
|
add int64
|
||||||
del int64
|
del int64
|
||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Seen decrements the add and del counters.
|
// Seen decrements the add and del counters.
|
||||||
func (e *PodExpectations) Seen(add, del int64) {
|
func (e *ControlleeExpectations) Seen(add, del int64) {
|
||||||
atomic.AddInt64(&e.add, -add)
|
atomic.AddInt64(&e.add, -add)
|
||||||
atomic.AddInt64(&e.del, -del)
|
atomic.AddInt64(&e.del, -del)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fulfilled returns true if this expectation has been fulfilled.
|
// Fulfilled returns true if this expectation has been fulfilled.
|
||||||
func (e *PodExpectations) Fulfilled() bool {
|
func (e *ControlleeExpectations) Fulfilled() bool {
|
||||||
// TODO: think about why this line being atomic doesn't matter
|
// TODO: think about why this line being atomic doesn't matter
|
||||||
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
|
return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetExpectations returns the add and del expectations of the pod.
|
// GetExpectations returns the add and del expectations of the controllee.
|
||||||
func (e *PodExpectations) GetExpectations() (int64, int64) {
|
func (e *ControlleeExpectations) GetExpectations() (int64, int64) {
|
||||||
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
|
return atomic.LoadInt64(&e.add), atomic.LoadInt64(&e.del)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewControllerExpectations returns a store for PodExpectations.
|
// NewControllerExpectations returns a store for ControlleeExpectations.
|
||||||
func NewControllerExpectations() *ControllerExpectations {
|
func NewControllerExpectations() *ControllerExpectations {
|
||||||
return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)}
|
return &ControllerExpectations{cache.NewTTLStore(ExpKeyFunc, ExpectationsTimeout)}
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,11 @@ type DeploymentController struct {
|
|||||||
podStoreSynced func() bool
|
podStoreSynced func() bool
|
||||||
|
|
||||||
// A TTLCache of pod creates/deletes each deployment expects to see
|
// A TTLCache of pod creates/deletes each deployment expects to see
|
||||||
expectations controller.ControllerExpectationsInterface
|
podExpectations controller.ControllerExpectationsInterface
|
||||||
|
|
||||||
|
// A TTLCache of rc creates/deletes each deployment expects to see
|
||||||
|
// TODO: make expectation model understand (rc) updates (besides adds and deletes)
|
||||||
|
rcExpectations controller.ControllerExpectationsInterface
|
||||||
|
|
||||||
// Deployments that need to be synced
|
// Deployments that need to be synced
|
||||||
queue *workqueue.Type
|
queue *workqueue.Type
|
||||||
@ -91,11 +95,12 @@ func NewDeploymentController(client client.Interface, resyncPeriod controller.Re
|
|||||||
eventBroadcaster.StartRecordingToSink(client.Events(""))
|
eventBroadcaster.StartRecordingToSink(client.Events(""))
|
||||||
|
|
||||||
dc := &DeploymentController{
|
dc := &DeploymentController{
|
||||||
client: client,
|
client: client,
|
||||||
expClient: client.Extensions(),
|
expClient: client.Extensions(),
|
||||||
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
|
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
|
||||||
queue: workqueue.New(),
|
queue: workqueue.New(),
|
||||||
expectations: controller.NewControllerExpectations(),
|
podExpectations: controller.NewControllerExpectations(),
|
||||||
|
rcExpectations: controller.NewControllerExpectations(),
|
||||||
}
|
}
|
||||||
|
|
||||||
dc.dStore.Store, dc.dController = framework.NewInformer(
|
dc.dStore.Store, dc.dController = framework.NewInformer(
|
||||||
@ -192,6 +197,12 @@ func (dc *DeploymentController) addRC(obj interface{}) {
|
|||||||
rc := obj.(*api.ReplicationController)
|
rc := obj.(*api.ReplicationController)
|
||||||
glog.V(4).Infof("Replication controller %s added.", rc.Name)
|
glog.V(4).Infof("Replication controller %s added.", rc.Name)
|
||||||
if d := dc.getDeploymentForRC(rc); rc != nil {
|
if d := dc.getDeploymentForRC(rc); rc != nil {
|
||||||
|
dKey, err := controller.KeyFunc(d)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dc.rcExpectations.CreationObserved(dKey)
|
||||||
dc.enqueueDeployment(d)
|
dc.enqueueDeployment(d)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -329,7 +340,7 @@ func (dc *DeploymentController) deletePod(obj interface{}) {
|
|||||||
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
|
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dc.expectations.DeletionObserved(dKey)
|
dc.podExpectations.DeletionObserved(dKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -384,7 +395,8 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||||||
}
|
}
|
||||||
if !exists {
|
if !exists {
|
||||||
glog.Infof("Deployment has been deleted %v", key)
|
glog.Infof("Deployment has been deleted %v", key)
|
||||||
dc.expectations.DeleteExpectations(key)
|
dc.podExpectations.DeleteExpectations(key)
|
||||||
|
dc.rcExpectations.DeleteExpectations(key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
d := *obj.(*extensions.Deployment)
|
d := *obj.(*extensions.Deployment)
|
||||||
@ -392,7 +404,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
|||||||
// Sleep so we give the rc reflector goroutine a chance to run.
|
// Sleep so we give the rc reflector goroutine a chance to run.
|
||||||
time.Sleep(RcStoreSyncedPollPeriod)
|
time.Sleep(RcStoreSyncedPollPeriod)
|
||||||
glog.Infof("Waiting for rc controller to sync, requeuing deployment %s", d.Name)
|
glog.Infof("Waiting for rc controller to sync, requeuing deployment %s", d.Name)
|
||||||
dc.enqueueDeployment(d)
|
dc.enqueueDeployment(&d)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -506,6 +518,15 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
|
|||||||
if err != nil || existingNewRC != nil {
|
if err != nil || existingNewRC != nil {
|
||||||
return existingNewRC, err
|
return existingNewRC, err
|
||||||
}
|
}
|
||||||
|
// Check the rc expectations of deployment before creating a new rc
|
||||||
|
dKey, err := controller.KeyFunc(&deployment)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't get key for deployment %#v: %v", deployment, err)
|
||||||
|
}
|
||||||
|
if !dc.rcExpectations.SatisfiedExpectations(dKey) {
|
||||||
|
dc.enqueueDeployment(&deployment)
|
||||||
|
return nil, fmt.Errorf("RC expectations not met yet before getting new RC\n")
|
||||||
|
}
|
||||||
// new RC does not exist, create one.
|
// new RC does not exist, create one.
|
||||||
namespace := deployment.ObjectMeta.Namespace
|
namespace := deployment.ObjectMeta.Namespace
|
||||||
podTemplateSpecHash := deploymentutil.GetPodTemplateSpecHash(deployment.Spec.Template)
|
podTemplateSpecHash := deploymentutil.GetPodTemplateSpecHash(deployment.Spec.Template)
|
||||||
@ -513,6 +534,13 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
|
|||||||
// Add podTemplateHash label to selector.
|
// Add podTemplateHash label to selector.
|
||||||
newRCSelector := deploymentutil.CloneAndAddLabel(deployment.Spec.Selector, deployment.Spec.UniqueLabelKey, podTemplateSpecHash)
|
newRCSelector := deploymentutil.CloneAndAddLabel(deployment.Spec.Selector, deployment.Spec.UniqueLabelKey, podTemplateSpecHash)
|
||||||
|
|
||||||
|
// Set RC expectations (1 rc should be created)
|
||||||
|
dKey, err = controller.KeyFunc(&deployment)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't get key for deployment controller %#v: %v", deployment, err)
|
||||||
|
}
|
||||||
|
dc.rcExpectations.ExpectCreations(dKey, 1)
|
||||||
|
// Create new RC
|
||||||
newRC := api.ReplicationController{
|
newRC := api.ReplicationController{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
GenerateName: deployment.Name + "-",
|
GenerateName: deployment.Name + "-",
|
||||||
@ -526,6 +554,7 @@ func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api
|
|||||||
}
|
}
|
||||||
createdRC, err := dc.client.ReplicationControllers(namespace).Create(&newRC)
|
createdRC, err := dc.client.ReplicationControllers(namespace).Create(&newRC)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
dc.rcExpectations.DeleteExpectations(dKey)
|
||||||
return nil, fmt.Errorf("error creating replication controller: %v", err)
|
return nil, fmt.Errorf("error creating replication controller: %v", err)
|
||||||
}
|
}
|
||||||
return createdRC, nil
|
return createdRC, nil
|
||||||
@ -587,8 +616,8 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
|
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
|
||||||
}
|
}
|
||||||
if expectationsCheck && !dc.expectations.SatisfiedExpectations(dKey) {
|
if expectationsCheck && !dc.podExpectations.SatisfiedExpectations(dKey) {
|
||||||
fmt.Printf("Expectations not met yet before reconciling old RCs\n")
|
glog.V(4).Infof("Pod expectations not met yet before reconciling old RCs\n")
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
// Find the number of ready pods.
|
// Find the number of ready pods.
|
||||||
@ -624,7 +653,7 @@ func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControl
|
|||||||
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
|
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
|
||||||
}
|
}
|
||||||
if expectationsCheck {
|
if expectationsCheck {
|
||||||
dc.expectations.ExpectDeletions(dKey, scaleDownCount)
|
dc.podExpectations.ExpectDeletions(dKey, scaleDownCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true, err
|
return true, err
|
||||||
|
Loading…
Reference in New Issue
Block a user