Extract common code in deployment e2e and integration test

This commit is contained in:
Janet Kuo 2017-05-15 16:01:37 -07:00
parent 282c90bc1a
commit 3f2d8ae682
5 changed files with 222 additions and 326 deletions

View File

@ -63,6 +63,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/client-go/discovery" "k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
@ -122,6 +123,9 @@ const (
// How often to Poll pods, nodes and claims. // How often to Poll pods, nodes and claims.
Poll = 2 * time.Second Poll = 2 * time.Second
pollShortTimeout = 1 * time.Minute
pollLongTimeout = 5 * time.Minute
// service accounts are provisioned after namespace creation // service accounts are provisioned after namespace creation
// a service account is required to support pod creation in a namespace as part of admission control // a service account is required to support pod creation in a namespace as part of admission control
ServiceAccountProvisionTimeout = 2 * time.Minute ServiceAccountProvisionTimeout = 2 * time.Minute
@ -3174,7 +3178,7 @@ func NewDeployment(deploymentName string, replicas int32, podLabels map[string]s
Name: deploymentName, Name: deploymentName,
}, },
Spec: extensions.DeploymentSpec{ Spec: extensions.DeploymentSpec{
Replicas: func(i int32) *int32 { return &i }(replicas), Replicas: &replicas,
Selector: &metav1.LabelSelector{MatchLabels: podLabels}, Selector: &metav1.LabelSelector{MatchLabels: podLabels},
Strategy: extensions.DeploymentStrategy{ Strategy: extensions.DeploymentStrategy{
Type: strategyType, Type: strategyType,
@ -3201,72 +3205,7 @@ func NewDeployment(deploymentName string, replicas int32, podLabels map[string]s
// Note that the status should stay valid at all times unless shortly after a scaling event or the deployment is just created. // Note that the status should stay valid at all times unless shortly after a scaling event or the deployment is just created.
// To verify that the deployment status is valid and wait for the rollout to finish, use WaitForDeploymentStatus instead. // To verify that the deployment status is valid and wait for the rollout to finish, use WaitForDeploymentStatus instead.
func WaitForDeploymentStatusValid(c clientset.Interface, d *extensions.Deployment) error { func WaitForDeploymentStatusValid(c clientset.Interface, d *extensions.Deployment) error {
var ( return testutil.WaitForDeploymentStatusValid(c, d, Logf, Poll, pollLongTimeout)
oldRSs, allOldRSs, allRSs []*extensions.ReplicaSet
newRS *extensions.ReplicaSet
deployment *extensions.Deployment
reason string
)
err := wait.Poll(Poll, 5*time.Minute, func() (bool, error) {
var err error
deployment, err = c.Extensions().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
oldRSs, allOldRSs, newRS, err = deploymentutil.GetAllReplicaSets(deployment, c)
if err != nil {
return false, err
}
if newRS == nil {
// New RC hasn't been created yet.
reason = "new replica set hasn't been created yet"
Logf(reason)
return false, nil
}
allRSs = append(oldRSs, newRS)
// The old/new ReplicaSets need to contain the pod-template-hash label
for i := range allRSs {
if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
reason = "all replica sets need to contain the pod-template-hash label"
Logf(reason)
return false, nil
}
}
totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
maxCreated := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
if totalCreated > maxCreated {
reason = fmt.Sprintf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
Logf(reason)
return false, nil
}
minAvailable := deploymentutil.MinAvailable(deployment)
if deployment.Status.AvailableReplicas < minAvailable {
reason = fmt.Sprintf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable)
Logf(reason)
return false, nil
}
// When the deployment status and its underlying resources reach the desired state, we're done
if deploymentutil.DeploymentComplete(deployment, &deployment.Status) {
return true, nil
}
reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
Logf(reason)
return false, nil
})
if err == wait.ErrWaitTimeout {
logReplicaSetsOfDeployment(deployment, allOldRSs, newRS)
logPodsOfDeployment(c, deployment, allRSs)
err = fmt.Errorf("%s", reason)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
}
return nil
} }
// Waits for the deployment to reach desired state. // Waits for the deployment to reach desired state.
@ -3409,66 +3348,7 @@ func WatchRecreateDeployment(c clientset.Interface, d *extensions.Deployment) er
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image. // WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
// Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early. // Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string) error { func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string) error {
var deployment *extensions.Deployment return testutil.WaitForDeploymentRevisionAndImage(c, ns, deploymentName, revision, image, Logf, Poll, pollShortTimeout)
var newRS *extensions.ReplicaSet
var reason string
err := wait.Poll(Poll, 1*time.Minute, func() (bool, error) {
var err error
deployment, err = c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
// The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
newRS, err = deploymentutil.GetNewReplicaSet(deployment, c)
if err != nil {
return false, err
}
if newRS == nil {
reason = fmt.Sprintf("New replica set for deployment %q is yet to be created", deployment.Name)
Logf(reason)
return false, nil
}
if !labelsutil.SelectorHasLabel(newRS.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
reason = fmt.Sprintf("New replica set %q doesn't have DefaultDeploymentUniqueLabelKey", newRS.Name)
Logf(reason)
return false, nil
}
// Check revision of this deployment, and of the new replica set of this deployment
if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
reason = fmt.Sprintf("Deployment %q doesn't have the required revision set", deployment.Name)
Logf(reason)
return false, nil
}
if deployment.Spec.Template.Spec.Containers[0].Image != image {
reason = fmt.Sprintf("Deployment %q doesn't have the required image set", deployment.Name)
Logf(reason)
return false, nil
}
if newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision {
reason = fmt.Sprintf("New replica set %q doesn't have the required revision set", newRS.Name)
Logf(reason)
return false, nil
}
if newRS.Spec.Template.Spec.Containers[0].Image != image {
reason = fmt.Sprintf("New replica set %q doesn't have the required image set", newRS.Name)
Logf(reason)
return false, nil
}
return true, nil
})
if err == wait.ErrWaitTimeout {
logReplicaSetsOfDeployment(deployment, nil, newRS)
err = fmt.Errorf(reason)
}
if newRS == nil {
return fmt.Errorf("deployment %q failed to create new replica set", deploymentName)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deployment.Spec.Template.Spec.Containers[0].Image, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRS.Spec.Template.Spec.Containers[0].Image, revision, image, err)
}
return nil
} }
// CheckNewRSAnnotations check if the new RS's annotation is as expected // CheckNewRSAnnotations check if the new RS's annotation is as expected
@ -3533,17 +3413,7 @@ func WaitForDeploymentOldRSsNum(c clientset.Interface, ns, deploymentName string
} }
func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) { func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) {
if newRS != nil { testutil.LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, Logf)
Logf("New ReplicaSet of Deployment %s:\n%+v", deployment.Name, *newRS)
} else {
Logf("New ReplicaSet of Deployment %s is nil.", deployment.Name)
}
if len(allOldRSs) > 0 {
Logf("All old ReplicaSets of Deployment %s:", deployment.Name)
}
for i := range allOldRSs {
Logf("%+v", *allOldRSs[i])
}
} }
func WaitForObservedDeployment(c clientset.Interface, ns, deploymentName string, desiredGeneration int64) error { func WaitForObservedDeployment(c clientset.Interface, ns, deploymentName string, desiredGeneration int64) error {
@ -3575,24 +3445,7 @@ func WaitForDeploymentWithCondition(c clientset.Interface, ns, deploymentName, r
} }
func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment, rsList []*extensions.ReplicaSet) { func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment, rsList []*extensions.ReplicaSet) {
minReadySeconds := deployment.Spec.MinReadySeconds testutil.LogPodsOfDeployment(c, deployment, rsList, Logf)
podListFunc := func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
return c.Core().Pods(namespace).List(options)
}
podList, err := deploymentutil.ListPods(deployment, rsList, podListFunc)
if err != nil {
Logf("Failed to list Pods of Deployment %s: %v", deployment.Name, err)
return
}
for _, pod := range podList.Items {
availability := "not available"
if podutil.IsPodAvailable(&pod, minReadySeconds, metav1.Now()) {
availability = "available"
}
Logf("Pod %s is %s:\n%+v", pod.Name, availability, pod)
}
} }
// Waits for the number of events on the given object to reach a desired count. // Waits for the number of events on the given object to reach a desired count.

View File

@ -31,11 +31,9 @@ go_library(
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library", "//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/controller/deployment:go_default_library", "//pkg/controller/deployment:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/controller/replicaset:go_default_library", "//pkg/controller/replicaset:go_default_library",
"//pkg/util/labels:go_default_library",
"//test/integration/framework:go_default_library", "//test/integration/framework:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library", "//test/utils:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library",

View File

@ -17,12 +17,10 @@ limitations under the License.
package deployment package deployment
import ( import (
"fmt"
"net/http/httptest" "net/http/httptest"
"testing" "testing"
"time" "time"
"github.com/davecgh/go-spew/spew"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
@ -32,10 +30,9 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/deployment"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/controller/replicaset" "k8s.io/kubernetes/pkg/controller/replicaset"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils"
) )
const ( const (
@ -132,103 +129,8 @@ func addPodConditionReady(pod *v1.Pod, time metav1.Time) {
} }
} }
func (d *deploymentTester) logReplicaSetsOfDeployment(allOldRSs []*v1beta1.ReplicaSet, newRS *v1beta1.ReplicaSet) {
if newRS != nil {
d.t.Logf("New ReplicaSet of Deployment %s:\n%+v", d.deployment.Name, *newRS)
} else {
d.t.Logf("New ReplicaSet of Deployment %s is nil.", d.deployment.Name)
}
if len(allOldRSs) > 0 {
d.t.Logf("All old ReplicaSets of Deployment %s:", d.deployment.Name)
}
for i := range allOldRSs {
d.t.Logf(spew.Sprintf("%#v", *allOldRSs[i]))
}
}
func (d *deploymentTester) logPodsOfDeployment(rsList []*v1beta1.ReplicaSet) {
minReadySeconds := d.deployment.Spec.MinReadySeconds
podListFunc := func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
return d.c.Core().Pods(namespace).List(options)
}
podList, err := deploymentutil.ListPods(d.deployment, rsList, podListFunc)
if err != nil {
d.t.Logf("Failed to list Pods of Deployment %s: %v", d.deployment.Name, err)
return
}
for _, pod := range podList.Items {
availability := "not available"
if podutil.IsPodAvailable(&pod, minReadySeconds, metav1.Now()) {
availability = "available"
}
d.t.Logf("Pod %s is %s:\n%s", pod.Name, availability, spew.Sprintf("%#v", pod))
}
}
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
// Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
func (d *deploymentTester) waitForDeploymentRevisionAndImage(revision, image string) error { func (d *deploymentTester) waitForDeploymentRevisionAndImage(revision, image string) error {
var deployment *v1beta1.Deployment return testutil.WaitForDeploymentRevisionAndImage(d.c, d.deployment.Namespace, d.deployment.Name, revision, image, d.t.Logf, pollInterval, pollTimeout)
var newRS *v1beta1.ReplicaSet
var reason string
deploymentName, ns := d.deployment.Name, d.deployment.Namespace
err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
var err error
deployment, err = d.c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
// The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
newRS, err = deploymentutil.GetNewReplicaSet(deployment, d.c)
if err != nil {
return false, err
}
if newRS == nil {
reason = fmt.Sprintf("New replica set for deployment %q is yet to be created", deployment.Name)
d.t.Logf(reason)
return false, nil
}
if !labelsutil.SelectorHasLabel(newRS.Spec.Selector, v1beta1.DefaultDeploymentUniqueLabelKey) {
reason = fmt.Sprintf("New replica set %q doesn't have DefaultDeploymentUniqueLabelKey", newRS.Name)
d.t.Logf(reason)
return false, nil
}
// Check revision of this deployment, and of the new replica set of this deployment
if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
reason = fmt.Sprintf("Deployment %q doesn't have the required revision set", deployment.Name)
d.t.Logf(reason)
return false, nil
}
if deployment.Spec.Template.Spec.Containers[0].Image != image {
reason = fmt.Sprintf("Deployment %q doesn't have the required image set", deployment.Name)
d.t.Logf(reason)
return false, nil
}
if newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision {
reason = fmt.Sprintf("New replica set %q doesn't have the required revision set", newRS.Name)
d.t.Logf(reason)
return false, nil
}
if newRS.Spec.Template.Spec.Containers[0].Image != image {
reason = fmt.Sprintf("New replica set %q doesn't have the required image set", newRS.Name)
d.t.Logf(reason)
return false, nil
}
return true, nil
})
if err == wait.ErrWaitTimeout {
d.logReplicaSetsOfDeployment(nil, newRS)
err = fmt.Errorf(reason)
}
if newRS == nil {
return fmt.Errorf("deployment %q failed to create new replica set", deploymentName)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deployment.Spec.Template.Spec.Containers[0].Image, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRS.Spec.Template.Spec.Containers[0].Image, revision, image, err)
}
return nil
} }
// markAllPodsReady manually updates all Deployment pods status to ready // markAllPodsReady manually updates all Deployment pods status to ready
@ -270,73 +172,7 @@ func (d *deploymentTester) markAllPodsReady() {
} }
func (d *deploymentTester) waitForDeploymentStatusValid() error { func (d *deploymentTester) waitForDeploymentStatusValid() error {
var ( return testutil.WaitForDeploymentStatusValid(d.c, d.deployment, d.t.Logf, pollInterval, pollTimeout)
oldRSs, allOldRSs, allRSs []*v1beta1.ReplicaSet
newRS *v1beta1.ReplicaSet
deployment *v1beta1.Deployment
reason string
)
name := d.deployment.Name
err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
var err error
deployment, err = d.c.Extensions().Deployments(d.deployment.Namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
oldRSs, allOldRSs, newRS, err = deploymentutil.GetAllReplicaSets(deployment, d.c)
if err != nil {
return false, err
}
if newRS == nil {
// New RC hasn't been created yet.
reason = "new replica set hasn't been created yet"
d.t.Logf(reason)
return false, nil
}
allRSs = append(oldRSs, newRS)
// The old/new ReplicaSets need to contain the pod-template-hash label
for i := range allRSs {
if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, v1beta1.DefaultDeploymentUniqueLabelKey) {
reason = "all replica sets need to contain the pod-template-hash label"
d.t.Logf(reason)
return false, nil
}
}
totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
maxCreated := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
if totalCreated > maxCreated {
reason = fmt.Sprintf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
d.t.Logf(reason)
return false, nil
}
minAvailable := deploymentutil.MinAvailable(deployment)
if deployment.Status.AvailableReplicas < minAvailable {
reason = fmt.Sprintf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable)
d.t.Logf(reason)
return false, nil
}
// When the deployment status and its underlying resources reach the desired state, we're done
if deploymentutil.DeploymentComplete(deployment, &deployment.Status) {
return true, nil
}
reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
d.t.Logf(reason)
return false, nil
})
if err == wait.ErrWaitTimeout {
d.logReplicaSetsOfDeployment(allOldRSs, newRS)
d.logPodsOfDeployment(allRSs)
err = fmt.Errorf("%s", reason)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.deployment.Name, err)
}
return nil
} }
// waitForDeploymentStatusValidAndMarkPodsReady waits for the Deployment status to become valid // waitForDeploymentStatusValidAndMarkPodsReady waits for the Deployment status to become valid

View File

@ -12,6 +12,7 @@ go_library(
srcs = [ srcs = [
"conditions.go", "conditions.go",
"density_utils.go", "density_utils.go",
"deployment.go",
"pod_store.go", "pod_store.go",
"runners.go", "runners.go",
"tmpdir.go", "tmpdir.go",
@ -20,12 +21,15 @@ go_library(
deps = [ deps = [
"//pkg/api:go_default_library", "//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library", "//pkg/api/v1:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/apis/batch:go_default_library", "//pkg/apis/batch:go_default_library",
"//pkg/apis/batch/v1:go_default_library", "//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/extensions:go_default_library", "//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library", "//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/util/labels:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",

205
test/utils/deployment.go Normal file
View File

@ -0,0 +1,205 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/api/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)
type LogfFn func(format string, args ...interface{})
func LogReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, logf LogfFn) {
if newRS != nil {
logf("New ReplicaSet of Deployment %s:\n%+v", deployment.Name, *newRS)
} else {
logf("New ReplicaSet of Deployment %s is nil.", deployment.Name)
}
if len(allOldRSs) > 0 {
logf("All old ReplicaSets of Deployment %s:", deployment.Name)
}
for i := range allOldRSs {
logf("%+v", *allOldRSs[i])
}
}
func LogPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment, rsList []*extensions.ReplicaSet, logf LogfFn) {
minReadySeconds := deployment.Spec.MinReadySeconds
podListFunc := func(namespace string, options metav1.ListOptions) (*v1.PodList, error) {
return c.Core().Pods(namespace).List(options)
}
podList, err := deploymentutil.ListPods(deployment, rsList, podListFunc)
if err != nil {
logf("Failed to list Pods of Deployment %s: %v", deployment.Name, err)
return
}
for _, pod := range podList.Items {
availability := "not available"
if podutil.IsPodAvailable(&pod, minReadySeconds, metav1.Now()) {
availability = "available"
}
logf("Pod %s is %s:\n%+v", pod.Name, availability, pod)
}
}
// Waits for the deployment status to become valid (i.e. max unavailable and max surge aren't violated anymore).
// Note that the status should stay valid at all times unless shortly after a scaling event or the deployment is just created.
// To verify that the deployment status is valid and wait for the rollout to finish, use WaitForDeploymentStatus instead.
func WaitForDeploymentStatusValid(c clientset.Interface, d *extensions.Deployment, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
var (
oldRSs, allOldRSs, allRSs []*extensions.ReplicaSet
newRS *extensions.ReplicaSet
deployment *extensions.Deployment
reason string
)
err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
var err error
deployment, err = c.Extensions().Deployments(d.Namespace).Get(d.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
oldRSs, allOldRSs, newRS, err = deploymentutil.GetAllReplicaSets(deployment, c)
if err != nil {
return false, err
}
if newRS == nil {
// New RC hasn't been created yet.
reason = "new replica set hasn't been created yet"
logf(reason)
return false, nil
}
allRSs = append(oldRSs, newRS)
// The old/new ReplicaSets need to contain the pod-template-hash label
for i := range allRSs {
if !labelsutil.SelectorHasLabel(allRSs[i].Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
reason = "all replica sets need to contain the pod-template-hash label"
logf(reason)
return false, nil
}
}
totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
maxCreated := *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
if totalCreated > maxCreated {
reason = fmt.Sprintf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated)
logf(reason)
return false, nil
}
minAvailable := deploymentutil.MinAvailable(deployment)
if deployment.Status.AvailableReplicas < minAvailable {
reason = fmt.Sprintf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable)
logf(reason)
return false, nil
}
// When the deployment status and its underlying resources reach the desired state, we're done
if deploymentutil.DeploymentComplete(deployment, &deployment.Status) {
return true, nil
}
reason = fmt.Sprintf("deployment status: %#v", deployment.Status)
logf(reason)
return false, nil
})
if err == wait.ErrWaitTimeout {
LogReplicaSetsOfDeployment(deployment, allOldRSs, newRS, logf)
LogPodsOfDeployment(c, deployment, allRSs, logf)
err = fmt.Errorf("%s", reason)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q status to match expectation: %v", d.Name, err)
}
return nil
}
// WaitForDeploymentRevisionAndImage waits for the deployment's and its new RS's revision and container image to match the given revision and image.
// Note that deployment revision and its new RS revision should be updated shortly, so we only wait for 1 minute here to fail early.
func WaitForDeploymentRevisionAndImage(c clientset.Interface, ns, deploymentName string, revision, image string, logf LogfFn, pollInterval, pollTimeout time.Duration) error {
var deployment *extensions.Deployment
var newRS *extensions.ReplicaSet
var reason string
err := wait.Poll(pollInterval, pollTimeout, func() (bool, error) {
var err error
deployment, err = c.Extensions().Deployments(ns).Get(deploymentName, metav1.GetOptions{})
if err != nil {
return false, err
}
// The new ReplicaSet needs to be non-nil and contain the pod-template-hash label
newRS, err = deploymentutil.GetNewReplicaSet(deployment, c)
if err != nil {
return false, err
}
if newRS == nil {
reason = fmt.Sprintf("New replica set for deployment %q is yet to be created", deployment.Name)
logf(reason)
return false, nil
}
if !labelsutil.SelectorHasLabel(newRS.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
reason = fmt.Sprintf("New replica set %q doesn't have DefaultDeploymentUniqueLabelKey", newRS.Name)
logf(reason)
return false, nil
}
// Check revision of this deployment, and of the new replica set of this deployment
if deployment.Annotations == nil || deployment.Annotations[deploymentutil.RevisionAnnotation] != revision {
reason = fmt.Sprintf("Deployment %q doesn't have the required revision set", deployment.Name)
logf(reason)
return false, nil
}
if deployment.Spec.Template.Spec.Containers[0].Image != image {
reason = fmt.Sprintf("Deployment %q doesn't have the required image set", deployment.Name)
logf(reason)
return false, nil
}
if newRS.Annotations == nil || newRS.Annotations[deploymentutil.RevisionAnnotation] != revision {
reason = fmt.Sprintf("New replica set %q doesn't have the required revision set", newRS.Name)
logf(reason)
return false, nil
}
if newRS.Spec.Template.Spec.Containers[0].Image != image {
reason = fmt.Sprintf("New replica set %q doesn't have the required image set", newRS.Name)
logf(reason)
return false, nil
}
return true, nil
})
if err == wait.ErrWaitTimeout {
LogReplicaSetsOfDeployment(deployment, nil, newRS, logf)
err = fmt.Errorf(reason)
}
if newRS == nil {
return fmt.Errorf("deployment %q failed to create new replica set", deploymentName)
}
if err != nil {
return fmt.Errorf("error waiting for deployment %q (got %s / %s) and new replica set %q (got %s / %s) revision and image to match expectation (expected %s / %s): %v", deploymentName, deployment.Annotations[deploymentutil.RevisionAnnotation], deployment.Spec.Template.Spec.Containers[0].Image, newRS.Name, newRS.Annotations[deploymentutil.RevisionAnnotation], newRS.Spec.Template.Spec.Containers[0].Image, revision, image, err)
}
return nil
}