mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Make PDBs represent percentage in StatefulSet
This commit is contained in:
parent
d12d012a55
commit
1508ecfe37
@ -23,6 +23,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||||
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
|
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
|
||||||
@ -51,7 +52,7 @@ const statusUpdateRetries = 2
|
|||||||
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
|
// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that
|
||||||
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
|
// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should
|
||||||
// be more than enough.
|
// be more than enough.
|
||||||
// If the cotroller is running on a different node it is important that the two nodes have synced
|
// If the controller is running on a different node it is important that the two nodes have synced
|
||||||
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
|
// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough
|
||||||
// protection against unwanted pod disruptions.
|
// protection against unwanted pod disruptions.
|
||||||
const DeletionTimeout = 2 * 60 * time.Second
|
const DeletionTimeout = 2 * 60 * time.Second
|
||||||
@ -80,6 +81,10 @@ type DisruptionController struct {
|
|||||||
dController *cache.Controller
|
dController *cache.Controller
|
||||||
dLister cache.StoreToDeploymentLister
|
dLister cache.StoreToDeploymentLister
|
||||||
|
|
||||||
|
ssStore cache.Store
|
||||||
|
ssController *cache.Controller
|
||||||
|
ssLister cache.StoreToStatefulSetLister
|
||||||
|
|
||||||
// PodDisruptionBudget keys that need to be synced.
|
// PodDisruptionBudget keys that need to be synced.
|
||||||
queue workqueue.RateLimitingInterface
|
queue workqueue.RateLimitingInterface
|
||||||
recheckQueue workqueue.DelayingInterface
|
recheckQueue workqueue.DelayingInterface
|
||||||
@ -187,9 +192,23 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient c
|
|||||||
cache.ResourceEventHandlerFuncs{},
|
cache.ResourceEventHandlerFuncs{},
|
||||||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
|
||||||
)
|
)
|
||||||
|
|
||||||
dc.dLister.Indexer = dc.dIndexer
|
dc.dLister.Indexer = dc.dIndexer
|
||||||
|
|
||||||
|
dc.ssStore, dc.ssController = cache.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
|
||||||
|
return dc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
|
||||||
|
return dc.kubeClient.Apps().StatefulSets(v1.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&apps.StatefulSet{},
|
||||||
|
30*time.Second,
|
||||||
|
cache.ResourceEventHandlerFuncs{},
|
||||||
|
)
|
||||||
|
dc.ssLister.Store = dc.ssStore
|
||||||
|
|
||||||
return dc
|
return dc
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -201,7 +220,8 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient c
|
|||||||
// and we may well need further tweaks just to be able to access scale
|
// and we may well need further tweaks just to be able to access scale
|
||||||
// subresources.
|
// subresources.
|
||||||
func (dc *DisruptionController) finders() []podControllerFinder {
|
func (dc *DisruptionController) finders() []podControllerFinder {
|
||||||
return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets}
|
return []podControllerFinder{dc.getPodReplicationControllers, dc.getPodDeployments, dc.getPodReplicaSets,
|
||||||
|
dc.getPodStatefulSets}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getPodReplicaSets finds replicasets which have no matching deployments.
|
// getPodReplicaSets finds replicasets which have no matching deployments.
|
||||||
@ -231,6 +251,29 @@ func (dc *DisruptionController) getPodReplicaSets(pod *v1.Pod) ([]controllerAndS
|
|||||||
return cas, nil
|
return cas, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPodStatefulSet returns the statefulset managing the given pod.
|
||||||
|
func (dc *DisruptionController) getPodStatefulSets(pod *v1.Pod) ([]controllerAndScale, error) {
|
||||||
|
cas := []controllerAndScale{}
|
||||||
|
ss, err := dc.ssLister.GetPodStatefulSets(pod)
|
||||||
|
|
||||||
|
// GetPodStatefulSets returns an error only if no StatefulSets are found. We
|
||||||
|
// don't return that as an error to the caller.
|
||||||
|
if err != nil {
|
||||||
|
return cas, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
controllerScale := map[types.UID]int32{}
|
||||||
|
for _, s := range ss {
|
||||||
|
controllerScale[s.UID] = *(s.Spec.Replicas)
|
||||||
|
}
|
||||||
|
|
||||||
|
for uid, scale := range controllerScale {
|
||||||
|
cas = append(cas, controllerAndScale{UID: uid, scale: scale})
|
||||||
|
}
|
||||||
|
|
||||||
|
return cas, nil
|
||||||
|
}
|
||||||
|
|
||||||
// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
|
// getPodDeployments finds deployments for any replicasets which are being managed by deployments.
|
||||||
func (dc *DisruptionController) getPodDeployments(pod *v1.Pod) ([]controllerAndScale, error) {
|
func (dc *DisruptionController) getPodDeployments(pod *v1.Pod) ([]controllerAndScale, error) {
|
||||||
cas := []controllerAndScale{}
|
cas := []controllerAndScale{}
|
||||||
@ -284,6 +327,7 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) {
|
|||||||
go dc.rcController.Run(stopCh)
|
go dc.rcController.Run(stopCh)
|
||||||
go dc.rsController.Run(stopCh)
|
go dc.rsController.Run(stopCh)
|
||||||
go dc.dController.Run(stopCh)
|
go dc.dController.Run(stopCh)
|
||||||
|
go dc.ssController.Run(stopCh)
|
||||||
go wait.Until(dc.worker, time.Second, stopCh)
|
go wait.Until(dc.worker, time.Second, stopCh)
|
||||||
go wait.Until(dc.recheckWorker, time.Second, stopCh)
|
go wait.Until(dc.recheckWorker, time.Second, stopCh)
|
||||||
|
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
"k8s.io/kubernetes/pkg/apimachinery/registered"
|
||||||
|
apps "k8s.io/kubernetes/pkg/apis/apps/v1beta1"
|
||||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||||
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
|
||||||
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
|
policy "k8s.io/kubernetes/pkg/apis/policy/v1beta1"
|
||||||
@ -92,6 +93,7 @@ func newFakeDisruptionController() (*DisruptionController, *pdbStates) {
|
|||||||
rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
rcLister: cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
||||||
rsLister: cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
rsLister: cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})},
|
||||||
dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
|
dLister: cache.StoreToDeploymentLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
|
||||||
|
ssLister: cache.StoreToStatefulSetLister{Store: cache.NewStore(controller.KeyFunc)},
|
||||||
getUpdater: func() updater { return ps.Set },
|
getUpdater: func() updater { return ps.Set },
|
||||||
broadcaster: record.NewBroadcaster(),
|
broadcaster: record.NewBroadcaster(),
|
||||||
}
|
}
|
||||||
@ -236,6 +238,30 @@ func newReplicaSet(t *testing.T, size int32) (*extensions.ReplicaSet, string) {
|
|||||||
return rs, rsName
|
return rs, rsName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newStatefulSet(t *testing.T, size int32) (*apps.StatefulSet, string) {
|
||||||
|
ss := &apps.StatefulSet{
|
||||||
|
TypeMeta: metav1.TypeMeta{APIVersion: registered.GroupOrDie(v1.GroupName).GroupVersion.String()},
|
||||||
|
ObjectMeta: v1.ObjectMeta{
|
||||||
|
UID: uuid.NewUUID(),
|
||||||
|
Name: "foobar",
|
||||||
|
Namespace: v1.NamespaceDefault,
|
||||||
|
ResourceVersion: "18",
|
||||||
|
Labels: fooBar(),
|
||||||
|
},
|
||||||
|
Spec: apps.StatefulSetSpec{
|
||||||
|
Replicas: &size,
|
||||||
|
Selector: newSelFooBar(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ssName, err := controller.KeyFunc(ss)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Unexpected error naming StatefulSet %q: %v", ss.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ss, ssName
|
||||||
|
}
|
||||||
|
|
||||||
func update(t *testing.T, store cache.Store, obj interface{}) {
|
func update(t *testing.T, store cache.Store, obj interface{}) {
|
||||||
if err := store.Update(obj); err != nil {
|
if err := store.Update(obj); err != nil {
|
||||||
t.Fatalf("Could not add %+v to %+v: %v", obj, store, err)
|
t.Fatalf("Could not add %+v to %+v: %v", obj, store, err)
|
||||||
@ -409,6 +435,41 @@ func TestReplicationController(t *testing.T) {
|
|||||||
ps.VerifyDisruptionAllowed(t, pdbName, 0)
|
ps.VerifyDisruptionAllowed(t, pdbName, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStatefulSetController(t *testing.T) {
|
||||||
|
labels := map[string]string{
|
||||||
|
"foo": "bar",
|
||||||
|
"baz": "quux",
|
||||||
|
}
|
||||||
|
|
||||||
|
dc, ps := newFakeDisruptionController()
|
||||||
|
|
||||||
|
// 34% should round up to 2
|
||||||
|
pdb, pdbName := newPodDisruptionBudget(t, intstr.FromString("34%"))
|
||||||
|
add(t, dc.pdbLister.Store, pdb)
|
||||||
|
ss, _ := newStatefulSet(t, 3)
|
||||||
|
add(t, dc.ssLister.Store, ss)
|
||||||
|
dc.sync(pdbName)
|
||||||
|
|
||||||
|
// It starts out at 0 expected because, with no pods, the PDB doesn't know
|
||||||
|
// about the SS. This is a known bug. TODO(mml): file issue
|
||||||
|
ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{})
|
||||||
|
|
||||||
|
pods := []*v1.Pod{}
|
||||||
|
|
||||||
|
for i := int32(0); i < 3; i++ {
|
||||||
|
pod, _ := newPod(t, fmt.Sprintf("foobar %d", i))
|
||||||
|
pods = append(pods, pod)
|
||||||
|
pod.Labels = labels
|
||||||
|
add(t, dc.podLister.Indexer, pod)
|
||||||
|
dc.sync(pdbName)
|
||||||
|
if i < 2 {
|
||||||
|
ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{})
|
||||||
|
} else {
|
||||||
|
ps.VerifyPdbStatus(t, pdbName, 1, 3, 2, 3, map[string]metav1.Time{})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestTwoControllers(t *testing.T) {
|
func TestTwoControllers(t *testing.T) {
|
||||||
// Most of this test is in verifying intermediate cases as we define the
|
// Most of this test is in verifying intermediate cases as we define the
|
||||||
// three controllers and create the pods.
|
// three controllers and create the pods.
|
||||||
|
Loading…
Reference in New Issue
Block a user