mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Merge pull request #36748 from kargakis/remove-events-from-deployment-tests
Automatic merge from submit-queue Fix Recreate for Deployments and stop using events in e2e tests Fixes https://github.com/kubernetes/kubernetes/issues/36453 by removing events from the deployment tests. The test about events during a Rolling deployment is redundant so I just removed it (we already have another test specifically for Rolling deployments). Closes https://github.com/kubernetes/kubernetes/issues/32567 (preferred to use pod LISTs instead of a new status API field for replica sets that would add many more writes to replica sets). @kubernetes/deployment
This commit is contained in:
commit
5b240ca897
@ -65,7 +65,7 @@ func (m *PodControllerRefManager) Classify(pods []*v1.Pod) (
|
||||
pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp)
|
||||
continue
|
||||
}
|
||||
controllerRef := getControllerOf(pod.ObjectMeta)
|
||||
controllerRef := GetControllerOf(pod.ObjectMeta)
|
||||
if controllerRef != nil {
|
||||
if controllerRef.UID == m.controllerObject.UID {
|
||||
// already controlled
|
||||
@ -90,9 +90,9 @@ func (m *PodControllerRefManager) Classify(pods []*v1.Pod) (
|
||||
return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch
|
||||
}
|
||||
|
||||
// getControllerOf returns the controllerRef if controllee has a controller,
|
||||
// GetControllerOf returns the controllerRef if controllee has a controller,
|
||||
// otherwise returns nil.
|
||||
func getControllerOf(controllee v1.ObjectMeta) *metav1.OwnerReference {
|
||||
func GetControllerOf(controllee v1.ObjectMeta) *metav1.OwnerReference {
|
||||
for _, owner := range controllee.OwnerReferences {
|
||||
// controlled by other controller
|
||||
if owner.Controller != nil && *owner.Controller == true {
|
||||
|
@ -29,7 +29,6 @@ go_library(
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset/typed/core/v1:go_default_library",
|
||||
"//pkg/client/record:go_default_library",
|
||||
"//pkg/client/retry:go_default_library",
|
||||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/deployment/util:go_default_library",
|
||||
"//pkg/controller/informers:go_default_library",
|
||||
@ -49,12 +48,14 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"deployment_controller_test.go",
|
||||
"recreate_test.go",
|
||||
"rolling_test.go",
|
||||
"sync_test.go",
|
||||
],
|
||||
library = "go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/apimachinery/registered:go_default_library",
|
||||
"//pkg/apis/extensions/v1beta1:go_default_library",
|
||||
|
@ -117,6 +117,9 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer
|
||||
UpdateFunc: dc.updateReplicaSet,
|
||||
DeleteFunc: dc.deleteReplicaSet,
|
||||
})
|
||||
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
DeleteFunc: dc.deletePod,
|
||||
})
|
||||
|
||||
dc.syncHandler = dc.syncDeployment
|
||||
dc.dLister = dInformer.Lister()
|
||||
@ -167,12 +170,12 @@ func (dc *DeploymentController) deleteDeployment(obj interface{}) {
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
|
||||
return
|
||||
}
|
||||
d, ok = tombstone.Obj.(*extensions.Deployment)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a Deployment %#v", obj)
|
||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Deployment %#v", obj))
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -202,7 +205,8 @@ func (dc *DeploymentController) getDeploymentForReplicaSet(rs *extensions.Replic
|
||||
// trying to clean up one of the controllers, for now we just return the older one
|
||||
if len(deployments) > 1 {
|
||||
sort.Sort(util.BySelectorLastUpdateTime(deployments))
|
||||
glog.Errorf("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s", rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
|
||||
glog.V(4).Infof("user error! more than one deployment is selecting replica set %s/%s with labels: %#v, returning %s/%s",
|
||||
rs.Namespace, rs.Name, rs.Labels, deployments[0].Namespace, deployments[0].Name)
|
||||
}
|
||||
return deployments[0]
|
||||
}
|
||||
@ -246,12 +250,12 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
|
||||
return
|
||||
}
|
||||
rs, ok = tombstone.Obj.(*extensions.ReplicaSet)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod)
|
||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a ReplicaSet %#v, could take up to %v before a deployment recreates/updates replicasets", obj, FullDeploymentResyncPeriod))
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -261,20 +265,48 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// deletePod will enqueue a Recreate Deployment once all of its pods have stopped running.
|
||||
func (dc *DeploymentController) deletePod(obj interface{}) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
// in the list, leading to the insertion of a tombstone object which contains
|
||||
// the deleted key/value. Note that this value might be stale. If the Pod
|
||||
// changed labels the new deployment will not be woken up till the periodic resync.
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v, could take up to %v before a deployment recreates/updates pod", obj, FullDeploymentResyncPeriod))
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*v1.Pod)
|
||||
if !ok {
|
||||
utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a pod %#v, could take up to %v before a deployment recreates/updates pods", obj, FullDeploymentResyncPeriod))
|
||||
return
|
||||
}
|
||||
}
|
||||
if d := dc.getDeploymentForPod(pod); d != nil && d.Spec.Strategy.Type == extensions.RecreateDeploymentStrategyType {
|
||||
podList, err := dc.listPods(d)
|
||||
if err == nil && len(podList.Items) == 0 {
|
||||
dc.enqueueDeployment(d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deployment) {
|
||||
key, err := controller.KeyFunc(deployment)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %#v: %v", deployment, err)
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
|
||||
return
|
||||
}
|
||||
|
||||
dc.queue.Add(key)
|
||||
}
|
||||
|
||||
// enqueueAfter will enqueue a deployment after the provided amount of time in a secondary queue.
|
||||
// checkProgressAfter will enqueue a deployment after the provided amount of time in a secondary queue.
|
||||
// Once the deployment is popped out of the secondary queue, it is checked for progress and requeued
|
||||
// back to the main queue iff it has failed progressing.
|
||||
func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment, after time.Duration) {
|
||||
func (dc *DeploymentController) checkProgressAfter(deployment *extensions.Deployment, after time.Duration) {
|
||||
key, err := controller.KeyFunc(deployment)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", deployment, err))
|
||||
@ -284,6 +316,42 @@ func (dc *DeploymentController) enqueueAfter(deployment *extensions.Deployment,
|
||||
dc.progressQueue.AddAfter(key, after)
|
||||
}
|
||||
|
||||
// getDeploymentForPod returns the deployment managing the given Pod.
|
||||
func (dc *DeploymentController) getDeploymentForPod(pod *v1.Pod) *extensions.Deployment {
|
||||
// Find the owning replica set
|
||||
var rs *extensions.ReplicaSet
|
||||
var err error
|
||||
// Look at the owner reference
|
||||
controllerRef := controller.GetControllerOf(pod.ObjectMeta)
|
||||
if controllerRef != nil {
|
||||
// Not a pod owned by a replica set.
|
||||
if controllerRef.Kind != extensions.SchemeGroupVersion.WithKind("ReplicaSet").Kind {
|
||||
return nil
|
||||
}
|
||||
rs, err = dc.rsLister.ReplicaSets(pod.Namespace).Get(controllerRef.Name)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Cannot get replicaset %q for pod %q: %v", controllerRef.Name, pod.Name, err)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
// Fallback to listing replica sets.
|
||||
rss, err := dc.rsLister.GetPodReplicaSets(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("Cannot list replica sets for pod %q: %v", pod.Name, err)
|
||||
return nil
|
||||
}
|
||||
// TODO: Handle multiple replica sets gracefully
|
||||
// For now we return the oldest replica set.
|
||||
if len(rss) > 1 {
|
||||
utilruntime.HandleError(fmt.Errorf("more than one ReplicaSet is selecting pod %q with labels: %+v", pod.Name, pod.Labels))
|
||||
sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss))
|
||||
}
|
||||
rs = rss[0]
|
||||
}
|
||||
|
||||
return dc.getDeploymentForReplicaSet(rs)
|
||||
}
|
||||
|
||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||
func (dc *DeploymentController) worker() {
|
||||
@ -332,7 +400,7 @@ func (dc *DeploymentController) syncDeployment(key string) error {
|
||||
|
||||
obj, exists, err := dc.dLister.Indexer.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("Unable to retrieve deployment %v from store: %v", key, err)
|
||||
utilruntime.HandleError(fmt.Errorf("Unable to retrieve deployment %v from store: %v", key, err))
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
|
@ -231,7 +231,9 @@ func TestSyncDeploymentDontDoAnythingDuringDeletion(t *testing.T) {
|
||||
now := metav1.Now()
|
||||
d.DeletionTimestamp = &now
|
||||
f.dLister = append(f.dLister, d)
|
||||
f.objects = append(f.objects, d)
|
||||
|
||||
f.expectUpdateDeploymentStatusAction(d)
|
||||
f.run(getKey(d, t))
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error
|
||||
// See https://github.com/kubernetes/kubernetes/issues/18568
|
||||
|
||||
allRSs := append(oldRSs, newRS)
|
||||
newStatus := dc.calculateStatus(allRSs, newRS, d)
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
|
||||
// If the deployment is complete or it is progressing, there is no need to check if it
|
||||
// has timed out.
|
||||
@ -77,7 +77,7 @@ func (dc *DeploymentController) hasFailed(d *extensions.Deployment) (bool, error
|
||||
// for example a resync of the deployment after it was scaled up. In those cases,
|
||||
// we shouldn't try to estimate any progress.
|
||||
func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
|
||||
newStatus := dc.calculateStatus(allRSs, newRS, d)
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
|
||||
// If there is no progressDeadlineSeconds set, remove any Progressing condition.
|
||||
if d.Spec.ProgressDeadlineSeconds == nil {
|
||||
@ -88,23 +88,25 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe
|
||||
// a new rollout and this is a resync where we don't need to estimate any progress.
|
||||
// In such a case, we should simply not estimate any progress for this deployment.
|
||||
currentCond := util.GetDeploymentCondition(d.Status, extensions.DeploymentProgressing)
|
||||
isResyncEvent := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
|
||||
isCompleteDeployment := newStatus.Replicas == newStatus.UpdatedReplicas && currentCond != nil && currentCond.Reason == util.NewRSAvailableReason
|
||||
// Check for progress only if there is a progress deadline set and the latest rollout
|
||||
// hasn't completed yet. We also need to ensure the new replica set exists, otherwise
|
||||
// we cannot estimate any progress.
|
||||
if d.Spec.ProgressDeadlineSeconds != nil && !isResyncEvent && newRS != nil {
|
||||
// hasn't completed yet.
|
||||
if d.Spec.ProgressDeadlineSeconds != nil && !isCompleteDeployment {
|
||||
switch {
|
||||
case util.DeploymentComplete(d, &newStatus):
|
||||
// Update the deployment conditions with a message for the new replica set that
|
||||
// was successfully deployed. If the condition already exists, we ignore this update.
|
||||
msg := fmt.Sprintf("Replica set %q has successfully progressed.", newRS.Name)
|
||||
msg := fmt.Sprintf("ReplicaSet %q has successfully progressed.", newRS.Name)
|
||||
condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, util.NewRSAvailableReason, msg)
|
||||
util.SetDeploymentCondition(&newStatus, *condition)
|
||||
|
||||
case util.DeploymentProgressing(d, &newStatus):
|
||||
// If there is any progress made, continue by not checking if the deployment failed. This
|
||||
// behavior emulates the rolling updater progressDeadline check.
|
||||
msg := fmt.Sprintf("Replica set %q is progressing.", newRS.Name)
|
||||
msg := fmt.Sprintf("Deployment %q is progressing.", d.Name)
|
||||
if newRS != nil {
|
||||
msg = fmt.Sprintf("ReplicaSet %q is progressing.", newRS.Name)
|
||||
}
|
||||
condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionTrue, util.ReplicaSetUpdatedReason, msg)
|
||||
// Update the current Progressing condition or add a new one if it doesn't exist.
|
||||
// If a Progressing condition with status=true already exists, we should update
|
||||
@ -124,7 +126,10 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe
|
||||
case util.DeploymentTimedOut(d, &newStatus):
|
||||
// Update the deployment with a timeout condition. If the condition already exists,
|
||||
// we ignore this update.
|
||||
msg := fmt.Sprintf("Replica set %q has timed out progressing.", newRS.Name)
|
||||
msg := fmt.Sprintf("Deployment %q has timed out progressing.", d.Name)
|
||||
if newRS != nil {
|
||||
msg = fmt.Sprintf("ReplicaSet %q has timed out progressing.", newRS.Name)
|
||||
}
|
||||
condition := util.NewDeploymentCondition(extensions.DeploymentProgressing, v1.ConditionFalse, util.TimedOutReason, msg)
|
||||
util.SetDeploymentCondition(&newStatus, *condition)
|
||||
}
|
||||
@ -154,7 +159,7 @@ func (dc *DeploymentController) syncRolloutStatus(allRSs []*extensions.ReplicaSe
|
||||
|
||||
after := time.Now().Add(time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second).Sub(currentCond.LastUpdateTime.Time)
|
||||
glog.V(4).Infof("Queueing up deployment %q for a progress check after %ds", d.Name, int(after.Seconds()))
|
||||
dc.enqueueAfter(d, after)
|
||||
dc.checkProgressAfter(d, after)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -17,12 +17,8 @@ limitations under the License.
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/client/retry"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
// rolloutRecreate implements the logic for recreating a replica set.
|
||||
@ -45,9 +41,16 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
}
|
||||
|
||||
// Wait for all old replica set to scale down to zero.
|
||||
if err := dc.waitForInactiveReplicaSets(activeOldRSs); err != nil {
|
||||
return err
|
||||
newStatus := calculateStatus(allRSs, newRS, deployment)
|
||||
// Do not process a deployment when it has old pods running.
|
||||
if newStatus.UpdatedReplicas == 0 {
|
||||
podList, err := dc.listPods(deployment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(podList.Items) > 0 {
|
||||
return dc.syncRolloutStatus(allRSs, newRS, deployment)
|
||||
}
|
||||
}
|
||||
|
||||
// If we need to create a new RS, create it now
|
||||
@ -97,40 +100,6 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext
|
||||
return scaled, nil
|
||||
}
|
||||
|
||||
// waitForInactiveReplicaSets will wait until all passed replica sets are inactive and have been noticed
|
||||
// by the replica set controller.
|
||||
func (dc *DeploymentController) waitForInactiveReplicaSets(oldRSs []*extensions.ReplicaSet) error {
|
||||
for i := range oldRSs {
|
||||
rs := oldRSs[i]
|
||||
desiredGeneration := rs.Generation
|
||||
observedGeneration := rs.Status.ObservedGeneration
|
||||
specReplicas := *(rs.Spec.Replicas)
|
||||
statusReplicas := rs.Status.Replicas
|
||||
|
||||
if err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
|
||||
replicaSet, err := dc.rsLister.ReplicaSets(rs.Namespace).Get(rs.Name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
specReplicas = *(replicaSet.Spec.Replicas)
|
||||
statusReplicas = replicaSet.Status.Replicas
|
||||
observedGeneration = replicaSet.Status.ObservedGeneration
|
||||
|
||||
// TODO: We also need to wait for terminating replicas to actually terminate.
|
||||
// See https://github.com/kubernetes/kubernetes/issues/32567
|
||||
return observedGeneration >= desiredGeneration && *(replicaSet.Spec.Replicas) == 0 && replicaSet.Status.Replicas == 0, nil
|
||||
}); err != nil {
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("replica set %q never became inactive: synced=%t, spec.replicas=%d, status.replicas=%d",
|
||||
rs.Name, observedGeneration >= desiredGeneration, specReplicas, statusReplicas)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate"
|
||||
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
|
||||
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)
|
||||
|
82
pkg/controller/deployment/recreate_test.go
Normal file
82
pkg/controller/deployment/recreate_test.go
Normal file
@ -0,0 +1,82 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/informers"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
func TestScaleDownOldReplicaSets(t *testing.T) {
|
||||
tests := []struct {
|
||||
oldRSSizes []int
|
||||
d *extensions.Deployment
|
||||
}{
|
||||
{
|
||||
oldRSSizes: []int{3},
|
||||
d: newDeployment("foo", 3, nil, nil, nil, map[string]string{"foo": "bar"}),
|
||||
},
|
||||
}
|
||||
|
||||
for i := range tests {
|
||||
t.Logf("running scenario %d", i)
|
||||
test := tests[i]
|
||||
|
||||
var oldRSs []*extensions.ReplicaSet
|
||||
var expected []runtime.Object
|
||||
|
||||
for n, size := range test.oldRSSizes {
|
||||
rs := newReplicaSet(test.d, fmt.Sprintf("%s-%d", test.d.Name, n), size)
|
||||
oldRSs = append(oldRSs, rs)
|
||||
|
||||
objCopy, err := api.Scheme.Copy(rs)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error while deep-copying: %v", err)
|
||||
continue
|
||||
}
|
||||
rsCopy := objCopy.(*extensions.ReplicaSet)
|
||||
|
||||
zero := int32(0)
|
||||
rsCopy.Spec.Replicas = &zero
|
||||
expected = append(expected, rsCopy)
|
||||
|
||||
if *(oldRSs[n].Spec.Replicas) == *(expected[n].(*extensions.ReplicaSet).Spec.Replicas) {
|
||||
t.Errorf("broken test - original and expected RS have the same size")
|
||||
}
|
||||
}
|
||||
|
||||
kc := fake.NewSimpleClientset(expected...)
|
||||
informers := informers.NewSharedInformerFactory(kc, nil, controller.NoResyncPeriodFunc())
|
||||
c := NewDeploymentController(informers.Deployments(), informers.ReplicaSets(), informers.Pods(), kc)
|
||||
|
||||
c.scaleDownOldReplicaSetsForRecreate(oldRSs, test.d)
|
||||
for j := range oldRSs {
|
||||
rs := oldRSs[j]
|
||||
|
||||
if *rs.Spec.Replicas != 0 {
|
||||
t.Errorf("rs %q has non-zero replicas", rs.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -558,7 +558,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe
|
||||
|
||||
// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
|
||||
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error {
|
||||
newStatus := dc.calculateStatus(allRSs, newRS, d)
|
||||
newStatus := calculateStatus(allRSs, newRS, d)
|
||||
|
||||
if reflect.DeepEqual(d.Status, newStatus) {
|
||||
return nil
|
||||
@ -570,7 +570,8 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.Replic
|
||||
return err
|
||||
}
|
||||
|
||||
func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) extensions.DeploymentStatus {
|
||||
// calculateStatus calculates the latest status for the provided deployment by looking into the provided replica sets.
|
||||
func calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) extensions.DeploymentStatus {
|
||||
availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs)
|
||||
totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
|
||||
unavailableReplicas := totalReplicas - availableReplicas
|
||||
@ -580,23 +581,30 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet,
|
||||
unavailableReplicas = 0
|
||||
}
|
||||
|
||||
if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
|
||||
minAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
|
||||
deploymentutil.SetDeploymentCondition(&deployment.Status, *minAvailability)
|
||||
} else {
|
||||
noMinAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.")
|
||||
deploymentutil.SetDeploymentCondition(&deployment.Status, *noMinAvailability)
|
||||
}
|
||||
|
||||
return extensions.DeploymentStatus{
|
||||
status := extensions.DeploymentStatus{
|
||||
// TODO: Ensure that if we start retrying status updates, we won't pick up a new Generation value.
|
||||
ObservedGeneration: deployment.Generation,
|
||||
Replicas: deploymentutil.GetActualReplicaCountForReplicaSets(allRSs),
|
||||
UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}),
|
||||
AvailableReplicas: availableReplicas,
|
||||
UnavailableReplicas: unavailableReplicas,
|
||||
Conditions: deployment.Status.Conditions,
|
||||
}
|
||||
|
||||
// Copy conditions one by one so we won't mutate the original object.
|
||||
conditions := deployment.Status.Conditions
|
||||
for i := range conditions {
|
||||
status.Conditions = append(status.Conditions, conditions[i])
|
||||
}
|
||||
|
||||
if availableReplicas >= *(deployment.Spec.Replicas)-deploymentutil.MaxUnavailable(*deployment) {
|
||||
minAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionTrue, deploymentutil.MinimumReplicasAvailable, "Deployment has minimum availability.")
|
||||
deploymentutil.SetDeploymentCondition(&status, *minAvailability)
|
||||
} else {
|
||||
noMinAvailability := deploymentutil.NewDeploymentCondition(extensions.DeploymentAvailable, v1.ConditionFalse, deploymentutil.MinimumReplicasUnavailable, "Deployment does not have minimum availability.")
|
||||
deploymentutil.SetDeploymentCondition(&status, *noMinAvailability)
|
||||
}
|
||||
|
||||
return status
|
||||
}
|
||||
|
||||
// isScalingEvent checks whether the provided deployment has been updated with a scaling event
|
||||
|
@ -207,7 +207,7 @@ func (rsc *ReplicaSetController) getPodReplicaSet(pod *v1.Pod) *extensions.Repli
|
||||
// overlap, sort by creation timestamp, subsort by name, then pick
|
||||
// the first.
|
||||
utilruntime.HandleError(fmt.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels))
|
||||
sort.Sort(overlappingReplicaSets(rss))
|
||||
sort.Sort(controller.ReplicaSetsByCreationTimestamp(rss))
|
||||
}
|
||||
|
||||
// update lookup cache
|
||||
|
@ -83,19 +83,6 @@ func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs exte
|
||||
}
|
||||
}
|
||||
|
||||
// overlappingReplicaSets sorts a list of ReplicaSets by creation timestamp, using their names as a tie breaker.
|
||||
type overlappingReplicaSets []*extensions.ReplicaSet
|
||||
|
||||
func (o overlappingReplicaSets) Len() int { return len(o) }
|
||||
func (o overlappingReplicaSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
||||
|
||||
func (o overlappingReplicaSets) Less(i, j int) bool {
|
||||
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
|
||||
return o[i].Name < o[j].Name
|
||||
}
|
||||
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
|
||||
}
|
||||
|
||||
func calculateStatus(rs extensions.ReplicaSet, filteredPods []*v1.Pod, manageReplicasErr error) extensions.ReplicaSetStatus {
|
||||
newStatus := rs.Status
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
|
@ -66,9 +66,6 @@ var _ = framework.KubeDescribe("Deployment", func() {
|
||||
It("RollingUpdateDeployment should delete old pods and create new ones", func() {
|
||||
testRollingUpdateDeployment(f)
|
||||
})
|
||||
It("RollingUpdateDeployment should scale up and down in the right order", func() {
|
||||
testRollingUpdateDeploymentEvents(f)
|
||||
})
|
||||
It("RecreateDeployment should delete old pods and create new ones", func() {
|
||||
testRecreateDeployment(f)
|
||||
})
|
||||
@ -314,7 +311,13 @@ func testRollingUpdateDeployment(f *framework.Framework) {
|
||||
|
||||
rsName := "test-rolling-update-controller"
|
||||
replicas := int32(3)
|
||||
_, err := c.Extensions().ReplicaSets(ns).Create(newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage))
|
||||
rsRevision := "3546343826724305832"
|
||||
annotations := make(map[string]string)
|
||||
annotations[deploymentutil.RevisionAnnotation] = rsRevision
|
||||
rs := newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage)
|
||||
rs.Annotations = annotations
|
||||
By(fmt.Sprintf("Creating replica set %q (going to be adopted)", rs.Name))
|
||||
_, err := c.Extensions().ReplicaSets(ns).Create(rs)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// Verify that the required pods have come up.
|
||||
err = framework.VerifyPods(c, ns, "sample-pod", false, 3)
|
||||
@ -325,18 +328,21 @@ func testRollingUpdateDeployment(f *framework.Framework) {
|
||||
|
||||
// Create a deployment to delete nginx pods and instead bring up redis pods.
|
||||
deploymentName := "test-rolling-update-deployment"
|
||||
framework.Logf("Creating deployment %s", deploymentName)
|
||||
By(fmt.Sprintf("Creating deployment %q", deploymentName))
|
||||
deploy, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, redisImageName, redisImage, extensions.RollingUpdateDeploymentStrategyType, nil))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Wait for it to be updated to revision 1
|
||||
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "1", redisImage)
|
||||
// Wait for it to be updated to revision 3546343826724305833.
|
||||
By(fmt.Sprintf("Ensuring deployment %q gets the next revision from the one the adopted replica set %q has", deploy.Name, rs.Name))
|
||||
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "3546343826724305833", redisImage)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Ensuring status for deployment %q is the expected", deploy.Name))
|
||||
err = framework.WaitForDeploymentStatus(c, deploy)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// There should be 1 old RS (nginx-controller, which is adopted)
|
||||
By(fmt.Sprintf("Ensuring deployment %q has one old replica set (the one it adopted)", deploy.Name))
|
||||
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
_, allOldRSs, err := deploymentutil.GetOldReplicaSets(deployment, c)
|
||||
@ -348,114 +354,34 @@ func testRollingUpdateDeployment(f *framework.Framework) {
|
||||
Expect(len(allOldRSs[0].Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0))
|
||||
}
|
||||
|
||||
func testRollingUpdateDeploymentEvents(f *framework.Framework) {
|
||||
ns := f.Namespace.Name
|
||||
c := f.ClientSet
|
||||
// Create nginx pods.
|
||||
deploymentPodLabels := map[string]string{"name": "sample-pod-2"}
|
||||
rsPodLabels := map[string]string{
|
||||
"name": "sample-pod-2",
|
||||
"pod": nginxImageName,
|
||||
}
|
||||
rsName := "test-rolling-scale-controller"
|
||||
replicas := int32(1)
|
||||
|
||||
rsRevision := "3546343826724305832"
|
||||
annotations := make(map[string]string)
|
||||
annotations[deploymentutil.RevisionAnnotation] = rsRevision
|
||||
rs := newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage)
|
||||
rs.Annotations = annotations
|
||||
|
||||
_, err := c.Extensions().ReplicaSets(ns).Create(rs)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// Verify that the required pods have come up.
|
||||
err = framework.VerifyPods(c, ns, "sample-pod-2", false, 1)
|
||||
if err != nil {
|
||||
framework.Logf("error in waiting for pods to come up: %s", err)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// Create a deployment to delete nginx pods and instead bring up redis pods.
|
||||
deploymentName := "test-rolling-scale-deployment"
|
||||
framework.Logf("Creating deployment %s", deploymentName)
|
||||
deploy, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, redisImageName, redisImage, extensions.RollingUpdateDeploymentStrategyType, nil))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Wait for it to be updated to revision 3546343826724305833
|
||||
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "3546343826724305833", redisImage)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
err = framework.WaitForDeploymentStatus(c, deploy)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// Verify that the pods were scaled up and down as expected. We use events to verify that.
|
||||
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
framework.WaitForEvents(c, ns, deployment, 2)
|
||||
events, err := c.Core().Events(ns).Search(deployment)
|
||||
if err != nil {
|
||||
framework.Logf("error in listing events: %s", err)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
// There should be 2 events, one to scale up the new ReplicaSet and then to scale down
|
||||
// the old ReplicaSet.
|
||||
Expect(len(events.Items)).Should(Equal(2))
|
||||
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(newRS).NotTo(Equal(nil))
|
||||
Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 1", newRS.Name)))
|
||||
Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName)))
|
||||
}
|
||||
|
||||
func testRecreateDeployment(f *framework.Framework) {
|
||||
ns := f.Namespace.Name
|
||||
c := f.ClientSet
|
||||
// Create nginx pods.
|
||||
deploymentPodLabels := map[string]string{"name": "sample-pod-3"}
|
||||
rsPodLabels := map[string]string{
|
||||
"name": "sample-pod-3",
|
||||
"pod": nginxImageName,
|
||||
}
|
||||
|
||||
rsName := "test-recreate-controller"
|
||||
replicas := int32(3)
|
||||
_, err := c.Extensions().ReplicaSets(ns).Create(newRS(rsName, replicas, rsPodLabels, nginxImageName, nginxImage))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// Verify that the required pods have come up.
|
||||
err = framework.VerifyPods(c, ns, "sample-pod-3", false, 3)
|
||||
if err != nil {
|
||||
framework.Logf("error in waiting for pods to come up: %s", err)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// Create a deployment to delete nginx pods and instead bring up redis pods.
|
||||
// Create a deployment that brings up redis pods.
|
||||
deploymentName := "test-recreate-deployment"
|
||||
framework.Logf("Creating deployment %s", deploymentName)
|
||||
deploy, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, deploymentPodLabels, redisImageName, redisImage, extensions.RecreateDeploymentStrategyType, nil))
|
||||
By(fmt.Sprintf("Creating deployment %q", deploymentName))
|
||||
deployment, err := c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, int32(3), map[string]string{"name": "sample-pod-3"}, redisImageName, redisImage, extensions.RecreateDeploymentStrategyType, nil))
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Wait for it to be updated to revision 1
|
||||
By(fmt.Sprintf("Waiting deployment %q to be updated to revision 1", deploymentName))
|
||||
err = framework.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, "1", redisImage)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
err = framework.WaitForDeploymentStatus(c, deploy)
|
||||
By(fmt.Sprintf("Waiting deployment %q to complete", deploymentName))
|
||||
Expect(framework.WaitForDeploymentStatusValid(c, deployment)).NotTo(HaveOccurred())
|
||||
|
||||
// Update deployment to delete redis pods and bring up nginx pods.
|
||||
By(fmt.Sprintf("Triggering a new rollout for deployment %q", deploymentName))
|
||||
deployment, err = framework.UpdateDeploymentWithRetries(c, ns, deploymentName, func(update *extensions.Deployment) {
|
||||
update.Spec.Template.Spec.Containers[0].Name = nginxImageName
|
||||
update.Spec.Template.Spec.Containers[0].Image = nginxImage
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Verify that the pods were scaled up and down as expected. We use events to verify that.
|
||||
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
framework.WaitForEvents(c, ns, deployment, 2)
|
||||
events, err := c.Core().Events(ns).Search(deployment)
|
||||
if err != nil {
|
||||
framework.Logf("error in listing events: %s", err)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
// There should be 2 events, one to scale up the new ReplicaSet and then to scale down the old ReplicaSet.
|
||||
Expect(len(events.Items)).Should(Equal(2))
|
||||
newRS, err := deploymentutil.GetNewReplicaSet(deployment, c)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(newRS).NotTo(Equal(nil))
|
||||
Expect(events.Items[0].Message).Should(Equal(fmt.Sprintf("Scaled down replica set %s to 0", rsName)))
|
||||
Expect(events.Items[1].Message).Should(Equal(fmt.Sprintf("Scaled up replica set %s to 3", newRS.Name)))
|
||||
By(fmt.Sprintf("Watching deployment %q to verify that new pods will not run with olds pods", deploymentName))
|
||||
Expect(framework.WatchRecreateDeployment(c, deployment)).NotTo(HaveOccurred())
|
||||
}
|
||||
|
||||
// testDeploymentCleanUpPolicy tests that deployment supports cleanup policy
|
||||
@ -494,6 +420,7 @@ func testDeploymentCleanUpPolicy(f *framework.Framework) {
|
||||
}
|
||||
stopCh := make(chan struct{})
|
||||
w, err := c.Core().Pods(ns).Watch(options)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
go func() {
|
||||
// There should be only one pod being created, which is the pod with the redis image.
|
||||
// The old RS shouldn't create new pod when deployment controller adding pod template hash label to its selector.
|
||||
|
@ -3290,6 +3290,40 @@ func WaitForDeploymentRollbackCleared(c clientset.Interface, ns, deploymentName
|
||||
return nil
|
||||
}
|
||||
|
||||
// WatchRecreateDeployment watches Recreate deployments and ensures no new pods will run at the same time with
|
||||
// old pods.
|
||||
func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) error {
|
||||
if d.Spec.Strategy.Type != extensions.RecreateDeploymentStrategyType {
|
||||
return fmt.Errorf("deployment %q does not use a Recreate strategy: %s", d.Name, d.Spec.Strategy.Type)
|
||||
}
|
||||
|
||||
w, err := c.Extensions().Deployments(d.Namespace).Watch(v1.SingleObject(v1.ObjectMeta{Name: d.Name, ResourceVersion: d.ResourceVersion}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
status := d.Status
|
||||
|
||||
condition := func(event watch.Event) (bool, error) {
|
||||
d := event.Object.(*extensions.Deployment)
|
||||
status = d.Status
|
||||
|
||||
if d.Status.UpdatedReplicas > 0 && d.Status.Replicas != d.Status.UpdatedReplicas {
|
||||
return false, fmt.Errorf("deployment %q is running new pods alongside old pods: %#v", d.Name, status)
|
||||
}
|
||||
|
||||
return *(d.Spec.Replicas) == d.Status.Replicas &&
|
||||
*(d.Spec.Replicas) == d.Status.UpdatedReplicas &&
|
||||
d.Generation <= d.Status.ObservedGeneration, nil
|
||||
}
|
||||
|
||||
_, err = watch.Until(2*time.Minute, w, condition)
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("deployment %q never completed: %#v", d.Name, status)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
|
||||
// Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
|
||||
func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string) error {
|
||||
|
@ -66,7 +66,6 @@ Density should allow running maximum capacity pods on nodes,smarterclayton,1
|
||||
Density should allow starting * pods per node using *,derekwaynecarr,0
|
||||
Deployment RecreateDeployment should delete old pods and create new ones,pwittrock,0
|
||||
Deployment RollingUpdateDeployment should delete old pods and create new ones,pwittrock,0
|
||||
Deployment RollingUpdateDeployment should scale up and down in the right order,pwittrock,0
|
||||
Deployment deployment reaping should cascade to its replica sets and pods,wojtek-t,1
|
||||
Deployment deployment should create new pods,pwittrock,0
|
||||
Deployment deployment should delete old replica sets,pwittrock,0
|
||||
|
|
Loading…
Reference in New Issue
Block a user