Handle overlapping deployments gracefully

1. When overlapping deployments are discovered, annotate them
2. Expose those overlapping annotations as warnings in kubectl describe
3. Only respect the earliest updated one (skip syncing all other overlapping deployments)
4. Use indexer instead of store for deployment lister
This commit is contained in:
Janet Kuo 2016-08-16 18:47:15 -07:00
parent fe808ec2a4
commit 90557ec56c
8 changed files with 233 additions and 41 deletions

View File

@ -285,12 +285,12 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments.
type StoreToDeploymentLister struct {
Store
Indexer
}
// Exists checks if the given deployment exists in the store.
func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) {
_, exists, err := s.Store.Get(deployment)
_, exists, err := s.Indexer.Get(deployment)
if err != nil {
return false, err
}
@ -300,7 +300,7 @@ func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (boo
// StoreToDeploymentLister lists all deployments in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) {
for _, c := range s.Store.List() {
for _, c := range s.Indexer.List() {
deployments = append(deployments, *(c.(*extensions.Deployment)))
}
return deployments, nil
@ -308,20 +308,17 @@ func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, e
// GetDeploymentsForReplicaSet returns a list of deployments managing a replica set. Returns an error only if no matching deployments are found.
func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.ReplicaSet) (deployments []extensions.Deployment, err error) {
var d extensions.Deployment
if len(rs.Labels) == 0 {
err = fmt.Errorf("no deployments found for ReplicaSet %v because it has no labels", rs.Name)
return
}
// TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label
for _, m := range s.Store.List() {
d = *m.(*extensions.Deployment)
if d.Namespace != rs.Namespace {
continue
}
dList, err := s.Deployments(rs.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, d := range dList {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
@ -338,6 +335,81 @@ func (s *StoreToDeploymentLister) GetDeploymentsForReplicaSet(rs *extensions.Rep
return
}
type storeToDeploymentNamespacer struct {
indexer Indexer
namespace string
}
// storeToDeploymentNamespacer lists deployments under its namespace in the store.
func (s storeToDeploymentNamespacer) List(selector labels.Selector) (deployments []extensions.Deployment, err error) {
if s.namespace == api.NamespaceAll {
for _, m := range s.indexer.List() {
d := *(m.(*extensions.Deployment))
if selector.Matches(labels.Set(d.Labels)) {
deployments = append(deployments, d)
}
}
return
}
key := &extensions.Deployment{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
// Ignore error; do slow search without index.
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
d := *(m.(*extensions.Deployment))
if s.namespace == d.Namespace && selector.Matches(labels.Set(d.Labels)) {
deployments = append(deployments, d)
}
}
return deployments, nil
}
for _, m := range items {
d := *(m.(*extensions.Deployment))
if selector.Matches(labels.Set(d.Labels)) {
deployments = append(deployments, d)
}
}
return
}
func (s *StoreToDeploymentLister) Deployments(namespace string) storeToDeploymentNamespacer {
return storeToDeploymentNamespacer{s.Indexer, namespace}
}
// GetDeploymentsForPods returns a list of deployments managing a pod. Returns an error only if no matching deployments are found.
func (s *StoreToDeploymentLister) GetDeploymentsForPod(pod *api.Pod) (deployments []extensions.Deployment, err error) {
if len(pod.Labels) == 0 {
err = fmt.Errorf("no deployments found for Pod %v because it has no labels", pod.Name)
return
}
if len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
return
}
dList, err := s.Deployments(pod.Namespace).List(labels.Everything())
if err != nil {
return
}
for _, d := range dList {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("invalid label selector: %v", err)
}
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
deployments = append(deployments, d)
}
if len(deployments) == 0 {
err = fmt.Errorf("could not find deployments set for Pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}
// StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets.
type StoreToReplicaSetLister struct {
Store

View File

@ -23,6 +23,7 @@ package deployment
import (
"fmt"
"reflect"
"sort"
"time"
"github.com/golang/glog"
@ -36,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
@ -108,7 +110,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
}
dc.dStore.Store, dc.dController = framework.NewInformer(
dc.dStore.Indexer, dc.dController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.client.Extensions().Deployments(api.NamespaceAll).List(options)
@ -125,6 +127,7 @@ func NewDeploymentController(client clientset.Interface, resyncPeriod controller
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
DeleteFunc: dc.deleteDeploymentNotification,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.rsStore.Store, dc.rsController = framework.NewInformer(
@ -252,7 +255,6 @@ func (dc *DeploymentController) addReplicaSet(obj interface{}) {
}
// getDeploymentForReplicaSet returns the deployment managing the given ReplicaSet.
// TODO: Surface that we are ignoring multiple deployments for a given ReplicaSet.
func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.ReplicaSet) *extensions.Deployment {
deployments, err := dc.dStore.GetDeploymentsForReplicaSet(rs)
if err != nil || len(deployments) == 0 {
@ -262,8 +264,11 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
// Because all ReplicaSet's belonging to a deployment should have a unique label key,
// there should never be more than one deployment returned by the above method.
// If that happens we should probably dynamically repair the situation by ultimately
// trying to clean up one of the controllers, for now we just return one of the two,
// likely randomly.
// trying to clean up one of the controllers, for now we just return the older one
if len(deployments) > 1 {
sort.Sort(util.BySelectorLastUpdateTime(deployments))
glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
}
return &deployments[0]
}
@ -321,22 +326,20 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
}
}
// getDeploymentForPod returns the deployment managing the ReplicaSet that manages the given Pod.
// TODO: Surface that we are ignoring multiple deployments for a given Pod.
// getDeploymentForPod returns the deployment that manages the given Pod.
// If there are multiple deployments for a given Pod, only return the oldest one.
func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment {
rss, err := dc.rsStore.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("Error: %v. No ReplicaSets found for pod %v, deployment controller will avoid syncing.", err, pod.Name)
deployments, err := dc.dStore.GetDeploymentsForPod(pod)
if err != nil || len(deployments) == 0 {
glog.V(4).Infof("Error: %v. No deployment found for Pod %v, deployment controller will avoid syncing.", err, pod.Name)
return nil
}
for _, rs := range rss {
deployments, err := dc.dStore.GetDeploymentsForReplicaSet(&rs)
if err == nil && len(deployments) > 0 {
return &deployments[0]
}
if len(deployments) > 1 {
sort.Sort(util.BySelectorLastUpdateTime(deployments))
glog.Errorf("user error! more than one deployment is selecting pod %s/%s with labels: %#v, returning %s/%s", pod.Namespace, pod.Name, pod.Labels, deployments[0].Namespace, deployments[0].Name)
}
glog.V(4).Infof("No deployments found for pod %v, deployment controller will avoid syncing.", pod.Name)
return nil
return &deployments[0]
}
// When a pod is created, ensure its controller syncs
@ -407,15 +410,28 @@ func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deploym
return
}
// TODO: Handle overlapping deployments better. Either disallow them at admission time or
// deterministically avoid syncing deployments that fight over ReplicaSet's. Currently, we
// only ensure that the same deployment is synced for a given ReplicaSet. When we
// periodically relist all deployments there will still be some ReplicaSet instability. One
// way to handle this is by querying the store for all deployments that this deployment
// overlaps, as well as all deployments that overlap this deployments, and sorting them.
dc.queue.Add(key)
}
func (dc *DeploymentController) markDeploymentOverlap(deployment *extensions.Deployment, withDeployment string) (*extensions.Deployment, error) {
if deployment.Annotations[util.OverlapAnnotation] == withDeployment {
return deployment, nil
}
if deployment.Annotations == nil {
deployment.Annotations = make(map[string]string)
}
deployment.Annotations[util.OverlapAnnotation] = withDeployment
return dc.client.Extensions().Deployments(deployment.Namespace).Update(deployment)
}
func (dc *DeploymentController) clearDeploymentOverlap(deployment *extensions.Deployment) (*extensions.Deployment, error) {
if len(deployment.Annotations[util.OverlapAnnotation]) == 0 {
return deployment, nil
}
delete(deployment.Annotations, util.OverlapAnnotation)
return dc.client.Extensions().Deployments(deployment.Namespace).Update(deployment)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dc *DeploymentController) worker() {
@ -463,7 +479,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := dc.dStore.Store.GetByKey(key)
obj, exists, err := dc.dStore.Indexer.GetByKey(key)
if err != nil {
glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
return err
@ -491,6 +507,11 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return dc.syncStatusOnly(d)
}
// Handle overlapping deployments by deterministically avoid syncing deployments that fight over ReplicaSets.
if err = dc.handleOverlap(d); err != nil {
return err
}
if d.Spec.Paused {
return dc.sync(d)
}
@ -518,3 +539,40 @@ func (dc *DeploymentController) syncDeployment(key string) error {
}
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}
// handleOverlap relists all deployment in the same namespace for overlaps, and avoid syncing
// the newer overlapping ones (only sync the oldest one). New/old is determined by when the
// deployment's selector is last updated.
func (dc *DeploymentController) handleOverlap(d *extensions.Deployment) error {
selector, err := unversioned.LabelSelectorAsSelector(d.Spec.Selector)
if err != nil {
return fmt.Errorf("deployment %s/%s has invalid label selector: %v", d.Namespace, d.Name, err)
}
deployments, err := dc.dStore.Deployments(d.Namespace).List(labels.Everything())
if err != nil {
return fmt.Errorf("error listing deployments in namespace %s: %v", d.Namespace, err)
}
overlapping := false
for i := range deployments {
other := &deployments[i]
if !selector.Empty() && selector.Matches(labels.Set(other.Spec.Template.Labels)) && d.UID != other.UID {
overlapping = true
// We don't care if the overlapping annotation update failed or not (we don't make decision on it)
d, _ = dc.markDeploymentOverlap(d, other.Name)
other, _ = dc.markDeploymentOverlap(other, d.Name)
// Skip syncing this one if older overlapping one is found
// TODO: figure out a better way to determine which deployment to skip,
// either with controller reference, or with validation.
// Using oldest active replica set to determine which deployment to skip wouldn't make much difference,
// since new replica set hasn't been created after selector update
if util.SelectorUpdatedBefore(other, d) {
return fmt.Errorf("found deployment %s/%s has overlapping selector with an older deployment %s/%s, skip syncing it", d.Namespace, d.Name, other.Namespace, other.Name)
}
}
}
if !overlapping {
// We don't care if the overlapping annotation update failed or not (we don't make decision on it)
d, _ = dc.clearDeploymentOverlap(d)
}
return nil
}

View File

@ -203,7 +203,7 @@ func (f *fixture) run(deploymentName string) {
c.rsStoreSynced = alwaysReady
c.podStoreSynced = alwaysReady
for _, d := range f.dStore {
c.dStore.Store.Add(d)
c.dStore.Indexer.Add(d)
}
for _, rs := range f.rsStore {
c.rsStore.Store.Add(rs)
@ -275,7 +275,7 @@ func TestDeploymentController_dontSyncDeploymentsWithEmptyPodSelector(t *testing
d := newDeployment(1, nil)
empty := unversioned.LabelSelector{}
d.Spec.Selector = &empty
controller.dStore.Store.Add(d)
controller.dStore.Indexer.Add(d)
// We expect the deployment controller to not take action here since it's configuration
// is invalid, even though no replicasets exist that match it's selector.
controller.syncDeployment(fmt.Sprintf("%s/%s", d.ObjectMeta.Namespace, d.ObjectMeta.Name))

View File

@ -58,6 +58,12 @@ const (
RollbackTemplateUnchanged = "DeploymentRollbackTemplateUnchanged"
// RollbackDone is the done rollback event reason
RollbackDone = "DeploymentRollback"
// OverlapAnnotation marks deployments with overlapping selector with other deployments
// TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210
OverlapAnnotation = "deployment.kubernetes.io/error-selector-overlapping-with"
// SelectorUpdateAnnotation marks the last time deployment selector update
// TODO: Delete this annotation when we gracefully handle overlapping selectors. See https://github.com/kubernetes/kubernetes/issues/2210
SelectorUpdateAnnotation = "deployment.kubernetes.io/selector-updated-at"
)
// MaxRevision finds the highest revision in the replica sets
@ -791,3 +797,42 @@ func DeploymentDeepCopy(deployment *extensions.Deployment) (*extensions.Deployme
}
return copied, nil
}
// SelectorUpdatedBefore returns true if the former deployment's selector
// is updated before the latter, false otherwise
func SelectorUpdatedBefore(d1, d2 *extensions.Deployment) bool {
t1, t2 := LastSelectorUpdate(d1), LastSelectorUpdate(d2)
return t1.Before(t2)
}
// LastSelectorUpdate returns the last time given deployment's selector is updated
func LastSelectorUpdate(d *extensions.Deployment) unversioned.Time {
t := d.Annotations[SelectorUpdateAnnotation]
if len(t) > 0 {
parsedTime, err := time.Parse(t, time.RFC3339)
// If failed to parse the time, use creation timestamp instead
if err != nil {
return d.CreationTimestamp
}
return unversioned.Time{Time: parsedTime}
}
// If it's never updated, use creation timestamp instead
return d.CreationTimestamp
}
// BySelectorLastUpdateTime sorts a list of deployments by the last update time of their selector,
// first using their creation timestamp and then their names as a tie breaker.
type BySelectorLastUpdateTime []extensions.Deployment
func (o BySelectorLastUpdateTime) Len() int { return len(o) }
func (o BySelectorLastUpdateTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o BySelectorLastUpdateTime) Less(i, j int) bool {
ti, tj := LastSelectorUpdate(&o[i]), LastSelectorUpdate(&o[j])
if ti.Equal(tj) {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}
return ti.Before(tj)
}

View File

@ -61,7 +61,7 @@ type DisruptionController struct {
rsController *framework.Controller
rsLister cache.StoreToReplicaSetLister
dStore cache.Store
dIndexer cache.Indexer
dController *framework.Controller
dLister cache.StoreToDeploymentLister
@ -155,7 +155,7 @@ func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClie
dc.rsLister.Store = dc.rsStore
dc.dStore, dc.dController = framework.NewInformer(
dc.dIndexer, dc.dController = framework.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return dc.kubeClient.Extensions().Deployments(api.NamespaceAll).List(options)
@ -167,9 +167,10 @@ func NewDisruptionController(podInformer framework.SharedIndexInformer, kubeClie
&extensions.Deployment{},
30*time.Second,
framework.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
dc.dLister.Store = dc.dStore
dc.dLister.Indexer = dc.dIndexer
return dc
}

View File

@ -81,7 +81,7 @@ func newFakeDisruptionController() (*DisruptionController, *pdbStates) {
podLister: cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
rsLister: cache.StoreToReplicaSetLister{Store: cache.NewStore(controller.KeyFunc)},
dLister: cache.StoreToDeploymentLister{Store: cache.NewStore(controller.KeyFunc)},
dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
getUpdater: func() updater { return ps.Set },
broadcaster: record.NewBroadcaster(),
}
@ -442,7 +442,7 @@ func TestTwoControllers(t *testing.T) {
d, _ := newDeployment(t, 11)
d.Spec.Selector = newSel(dLabels)
add(t, dc.dLister.Store, d)
add(t, dc.dLister.Indexer, d)
dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, true, 4, 4, 11)

View File

@ -2166,6 +2166,10 @@ func (dd *DeploymentDescriber) Describe(namespace, name string, describerSetting
}
fmt.Fprintf(out, "NewReplicaSet:\t%s\n", printReplicaSetsByLabels(newRSs))
}
overlapWith := d.Annotations[deploymentutil.OverlapAnnotation]
if len(overlapWith) > 0 {
fmt.Fprintf(out, "!!!WARNING!!! This deployment has overlapping label selector with deployment %q and won't behave as expected. Please fix it before continue.\n", overlapWith)
}
if describerSettings.ShowEvents {
events, err := dd.Core().Events(namespace).Search(d)
if err == nil && events != nil {

View File

@ -19,10 +19,13 @@ package deployment
import (
"fmt"
"reflect"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/extensions/validation"
"k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/generic"
@ -80,6 +83,15 @@ func (deploymentStrategy) PrepareForUpdate(ctx api.Context, obj, old runtime.Obj
!reflect.DeepEqual(newDeployment.Annotations, oldDeployment.Annotations) {
newDeployment.Generation = oldDeployment.Generation + 1
}
// Records timestamp on selector updates in annotation
if !reflect.DeepEqual(newDeployment.Spec.Selector, oldDeployment.Spec.Selector) {
if newDeployment.Annotations == nil {
newDeployment.Annotations = make(map[string]string)
}
now := unversioned.Now()
newDeployment.Annotations[util.SelectorUpdateAnnotation] = now.Format(time.RFC3339)
}
}
// ValidateUpdate is the default update validation for an end user.