mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
Merge pull request #34645 from kargakis/rs-conditions-controller-changes
Automatic merge from submit-queue Replica set conditions controller changes Follow-up to https://github.com/kubernetes/kubernetes/pull/33905, partially addresses https://github.com/kubernetes/kubernetes/issues/32863. @smarterclayton @soltysh @bgrant0607 @mfojtik I just need to add e2e tests
This commit is contained in:
commit
929d3f74e8
@ -618,36 +618,10 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
|
||||
manageReplicasErr = rsc.manageReplicas(filteredPods, &rs)
|
||||
}
|
||||
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
// template of the replicaSet, the matching pods may have more labels than
|
||||
// are in the template. Because the label of podTemplateSpec is a superset
|
||||
// of the selector of the replicaset, so the possible matching pods must be
|
||||
// part of the filteredPods.
|
||||
fullyLabeledReplicasCount := 0
|
||||
readyReplicasCount := 0
|
||||
availableReplicasCount := 0
|
||||
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
|
||||
for _, pod := range filteredPods {
|
||||
if templateLabel.Matches(labels.Set(pod.Labels)) {
|
||||
fullyLabeledReplicasCount++
|
||||
}
|
||||
if api.IsPodReady(pod) {
|
||||
readyReplicasCount++
|
||||
if api.IsPodAvailable(pod, rs.Spec.MinReadySeconds, unversioned.Now()) {
|
||||
availableReplicasCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)
|
||||
|
||||
// Always updates status as pods come up or die.
|
||||
if err := updateReplicaCount(
|
||||
rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace),
|
||||
rs,
|
||||
len(filteredPods),
|
||||
fullyLabeledReplicasCount,
|
||||
readyReplicasCount,
|
||||
availableReplicasCount,
|
||||
); err != nil {
|
||||
if err := updateReplicaSetStatus(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, newStatus); err != nil {
|
||||
// Multiple things could lead to this update failing. Requeuing the replica set ensures
|
||||
// Returning an error causes a requeue without forcing a hotloop
|
||||
return err
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"math/rand"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -386,6 +387,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
StatusCode: 200,
|
||||
ResponseBody: "{}",
|
||||
SkipRequestFn: skipListerFunc,
|
||||
T: t,
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
defer testServer.Close()
|
||||
@ -443,9 +445,9 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
|
||||
manager.syncReplicaSet(getKey(rsSpec, t))
|
||||
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
|
||||
|
||||
// 1 PUT for the ReplicaSet status during dormancy window.
|
||||
// 2 PUT for the ReplicaSet status during dormancy window.
|
||||
// Note that the pod creates go through pod control so they're not recorded.
|
||||
fakeHandler.ValidateRequestCount(t, 1)
|
||||
fakeHandler.ValidateRequestCount(t, 2)
|
||||
}
|
||||
|
||||
func TestPodControllerLookup(t *testing.T) {
|
||||
@ -715,8 +717,9 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
return true, &extensions.ReplicaSet{}, fmt.Errorf("Fake error")
|
||||
})
|
||||
fakeRSClient := fakeClient.Extensions().ReplicaSets("default")
|
||||
numReplicas := 10
|
||||
updateReplicaCount(fakeRSClient, *rs, numReplicas, 0, 0, 0)
|
||||
numReplicas := int32(10)
|
||||
newStatus := extensions.ReplicaSetStatus{Replicas: numReplicas}
|
||||
updateReplicaSetStatus(fakeRSClient, *rs, newStatus)
|
||||
updates, gets := 0, 0
|
||||
for _, a := range fakeClient.Actions() {
|
||||
if a.GetResource().Resource != "replicasets" {
|
||||
@ -737,7 +740,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
// returned a ReplicaSet with replicas=1.
|
||||
if c, ok := action.GetObject().(*extensions.ReplicaSet); !ok {
|
||||
t.Errorf("Expected a ReplicaSet as the argument to update, got %T", c)
|
||||
} else if int(c.Status.Replicas) != numReplicas {
|
||||
} else if c.Status.Replicas != numReplicas {
|
||||
t.Errorf("Expected update for ReplicaSet to contain replicas %v, got %v instead",
|
||||
numReplicas, c.Status.Replicas)
|
||||
}
|
||||
@ -1430,3 +1433,163 @@ func TestAvailableReplicas(t *testing.T) {
|
||||
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRs)
|
||||
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
|
||||
}
|
||||
|
||||
var (
|
||||
imagePullBackOff extensions.ReplicaSetConditionType = "ImagePullBackOff"
|
||||
|
||||
condImagePullBackOff = func() extensions.ReplicaSetCondition {
|
||||
return extensions.ReplicaSetCondition{
|
||||
Type: imagePullBackOff,
|
||||
Status: api.ConditionTrue,
|
||||
Reason: "NonExistentImage",
|
||||
}
|
||||
}
|
||||
|
||||
condReplicaFailure = func() extensions.ReplicaSetCondition {
|
||||
return extensions.ReplicaSetCondition{
|
||||
Type: extensions.ReplicaSetReplicaFailure,
|
||||
Status: api.ConditionTrue,
|
||||
Reason: "OtherFailure",
|
||||
}
|
||||
}
|
||||
|
||||
condReplicaFailure2 = func() extensions.ReplicaSetCondition {
|
||||
return extensions.ReplicaSetCondition{
|
||||
Type: extensions.ReplicaSetReplicaFailure,
|
||||
Status: api.ConditionTrue,
|
||||
Reason: "AnotherFailure",
|
||||
}
|
||||
}
|
||||
|
||||
status = func() *extensions.ReplicaSetStatus {
|
||||
return &extensions.ReplicaSetStatus{
|
||||
Conditions: []extensions.ReplicaSetCondition{condReplicaFailure()},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
func TestGetCondition(t *testing.T) {
|
||||
exampleStatus := status()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
status extensions.ReplicaSetStatus
|
||||
condType extensions.ReplicaSetConditionType
|
||||
condStatus api.ConditionStatus
|
||||
condReason string
|
||||
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "condition exists",
|
||||
|
||||
status: *exampleStatus,
|
||||
condType: extensions.ReplicaSetReplicaFailure,
|
||||
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "condition does not exist",
|
||||
|
||||
status: *exampleStatus,
|
||||
condType: imagePullBackOff,
|
||||
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
cond := GetCondition(test.status, test.condType)
|
||||
exists := cond != nil
|
||||
if exists != test.expected {
|
||||
t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetCondition(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
status *extensions.ReplicaSetStatus
|
||||
cond extensions.ReplicaSetCondition
|
||||
|
||||
expectedStatus *extensions.ReplicaSetStatus
|
||||
}{
|
||||
{
|
||||
name: "set for the first time",
|
||||
|
||||
status: &extensions.ReplicaSetStatus{},
|
||||
cond: condReplicaFailure(),
|
||||
|
||||
expectedStatus: &extensions.ReplicaSetStatus{Conditions: []extensions.ReplicaSetCondition{condReplicaFailure()}},
|
||||
},
|
||||
{
|
||||
name: "simple set",
|
||||
|
||||
status: &extensions.ReplicaSetStatus{Conditions: []extensions.ReplicaSetCondition{condImagePullBackOff()}},
|
||||
cond: condReplicaFailure(),
|
||||
|
||||
expectedStatus: &extensions.ReplicaSetStatus{Conditions: []extensions.ReplicaSetCondition{condImagePullBackOff(), condReplicaFailure()}},
|
||||
},
|
||||
{
|
||||
name: "overwrite",
|
||||
|
||||
status: &extensions.ReplicaSetStatus{Conditions: []extensions.ReplicaSetCondition{condReplicaFailure()}},
|
||||
cond: condReplicaFailure2(),
|
||||
|
||||
expectedStatus: &extensions.ReplicaSetStatus{Conditions: []extensions.ReplicaSetCondition{condReplicaFailure2()}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
SetCondition(test.status, test.cond)
|
||||
if !reflect.DeepEqual(test.status, test.expectedStatus) {
|
||||
t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveCondition(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
status *extensions.ReplicaSetStatus
|
||||
condType extensions.ReplicaSetConditionType
|
||||
|
||||
expectedStatus *extensions.ReplicaSetStatus
|
||||
}{
|
||||
{
|
||||
name: "remove from empty status",
|
||||
|
||||
status: &extensions.ReplicaSetStatus{},
|
||||
condType: extensions.ReplicaSetReplicaFailure,
|
||||
|
||||
expectedStatus: &extensions.ReplicaSetStatus{},
|
||||
},
|
||||
{
|
||||
name: "simple remove",
|
||||
|
||||
status: &extensions.ReplicaSetStatus{Conditions: []extensions.ReplicaSetCondition{condReplicaFailure()}},
|
||||
condType: extensions.ReplicaSetReplicaFailure,
|
||||
|
||||
expectedStatus: &extensions.ReplicaSetStatus{},
|
||||
},
|
||||
{
|
||||
name: "doesn't remove anything",
|
||||
|
||||
status: status(),
|
||||
condType: imagePullBackOff,
|
||||
|
||||
expectedStatus: status(),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
RemoveCondition(test.status, test.condType)
|
||||
if !reflect.DeepEqual(test.status, test.expectedStatus) {
|
||||
t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,24 +20,28 @@ package replicaset
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/internalversion"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
// updateReplicaCount attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
|
||||
func updateReplicaCount(rsClient unversionedextensions.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas, numFullyLabeledReplicas, numReadyReplicas, numAvailableReplicas int) (updateErr error) {
|
||||
// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
|
||||
func updateReplicaSetStatus(c unversionedextensions.ReplicaSetInterface, rs extensions.ReplicaSet, newStatus extensions.ReplicaSetStatus) (updateErr error) {
|
||||
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
|
||||
// we do a periodic relist every 30s. If the generations differ but the replicas are
|
||||
// the same, a caller might've resized to the same replica count.
|
||||
if int(rs.Status.Replicas) == numReplicas &&
|
||||
int(rs.Status.FullyLabeledReplicas) == numFullyLabeledReplicas &&
|
||||
int(rs.Status.ReadyReplicas) == numReadyReplicas &&
|
||||
int(rs.Status.AvailableReplicas) == numAvailableReplicas &&
|
||||
rs.Generation == rs.Status.ObservedGeneration {
|
||||
if rs.Status.Replicas == newStatus.Replicas &&
|
||||
rs.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
|
||||
rs.Status.ReadyReplicas == newStatus.ReadyReplicas &&
|
||||
rs.Status.AvailableReplicas == newStatus.AvailableReplicas &&
|
||||
rs.Generation == rs.Status.ObservedGeneration &&
|
||||
reflect.DeepEqual(rs.Status.Conditions, newStatus.Conditions) {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -53,30 +57,24 @@ func updateReplicaCount(rsClient unversionedextensions.ReplicaSetInterface, rs e
|
||||
// that we've seen a spec update when we retry.
|
||||
// TODO: This can clobber an update if we allow multiple agents to write to the
|
||||
// same status.
|
||||
generation := rs.Generation
|
||||
newStatus.ObservedGeneration = rs.Generation
|
||||
|
||||
var getErr error
|
||||
for i, rs := 0, &rs; ; i++ {
|
||||
glog.V(4).Infof(fmt.Sprintf("Updating replica count for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
|
||||
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, numReplicas, rs.Spec.Replicas) +
|
||||
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
|
||||
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, numReadyReplicas) +
|
||||
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, numAvailableReplicas) +
|
||||
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, generation))
|
||||
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, newStatus.Replicas, rs.Spec.Replicas) +
|
||||
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
|
||||
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, newStatus.ReadyReplicas) +
|
||||
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, newStatus.AvailableReplicas) +
|
||||
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, newStatus.ObservedGeneration))
|
||||
|
||||
rs.Status = extensions.ReplicaSetStatus{
|
||||
Replicas: int32(numReplicas),
|
||||
FullyLabeledReplicas: int32(numFullyLabeledReplicas),
|
||||
ReadyReplicas: int32(numReadyReplicas),
|
||||
AvailableReplicas: int32(numAvailableReplicas),
|
||||
ObservedGeneration: generation,
|
||||
}
|
||||
_, updateErr = rsClient.UpdateStatus(rs)
|
||||
rs.Status = newStatus
|
||||
_, updateErr = c.UpdateStatus(rs)
|
||||
if updateErr == nil || i >= statusUpdateRetries {
|
||||
return updateErr
|
||||
}
|
||||
// Update the ReplicaSet with the latest resource version for the next poll
|
||||
if rs, getErr = rsClient.Get(rs.Name); getErr != nil {
|
||||
if rs, getErr = c.Get(rs.Name); getErr != nil {
|
||||
// If the GET fails we can't trust status.Replicas anymore. This error
|
||||
// is bound to be more interesting than the update failure.
|
||||
return getErr
|
||||
@ -96,3 +94,97 @@ func (o overlappingReplicaSets) Less(i, j int) bool {
|
||||
}
|
||||
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
|
||||
}
|
||||
|
||||
func calculateStatus(rs extensions.ReplicaSet, filteredPods []*api.Pod, manageReplicasErr error) extensions.ReplicaSetStatus {
|
||||
newStatus := rs.Status
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
// template of the replica set, the matching pods may have more
|
||||
// labels than are in the template. Because the label of podTemplateSpec is
|
||||
// a superset of the selector of the replica set, so the possible
|
||||
// matching pods must be part of the filteredPods.
|
||||
fullyLabeledReplicasCount := 0
|
||||
readyReplicasCount := 0
|
||||
availableReplicasCount := 0
|
||||
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
|
||||
for _, pod := range filteredPods {
|
||||
if templateLabel.Matches(labels.Set(pod.Labels)) {
|
||||
fullyLabeledReplicasCount++
|
||||
}
|
||||
if api.IsPodReady(pod) {
|
||||
readyReplicasCount++
|
||||
if api.IsPodAvailable(pod, rs.Spec.MinReadySeconds, unversioned.Now()) {
|
||||
availableReplicasCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
failureCond := GetCondition(rs.Status, extensions.ReplicaSetReplicaFailure)
|
||||
if manageReplicasErr != nil && failureCond == nil {
|
||||
var reason string
|
||||
if diff := len(filteredPods) - int(rs.Spec.Replicas); diff < 0 {
|
||||
reason = "FailedCreate"
|
||||
} else if diff > 0 {
|
||||
reason = "FailedDelete"
|
||||
}
|
||||
cond := NewReplicaSetCondition(extensions.ReplicaSetReplicaFailure, api.ConditionTrue, reason, manageReplicasErr.Error())
|
||||
SetCondition(&newStatus, cond)
|
||||
} else if manageReplicasErr == nil && failureCond != nil {
|
||||
RemoveCondition(&newStatus, extensions.ReplicaSetReplicaFailure)
|
||||
}
|
||||
|
||||
newStatus.Replicas = int32(len(filteredPods))
|
||||
newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
|
||||
newStatus.ReadyReplicas = int32(readyReplicasCount)
|
||||
newStatus.AvailableReplicas = int32(availableReplicasCount)
|
||||
return newStatus
|
||||
}
|
||||
|
||||
// NewReplicaSetCondition creates a new replica set condition.
|
||||
func NewReplicaSetCondition(condType extensions.ReplicaSetConditionType, status api.ConditionStatus, reason, msg string) extensions.ReplicaSetCondition {
|
||||
return extensions.ReplicaSetCondition{
|
||||
Type: condType,
|
||||
Status: status,
|
||||
LastTransitionTime: unversioned.Now(),
|
||||
Reason: reason,
|
||||
Message: msg,
|
||||
}
|
||||
}
|
||||
|
||||
// GetCondition returns a replica set condition with the provided type if it exists.
|
||||
func GetCondition(status extensions.ReplicaSetStatus, condType extensions.ReplicaSetConditionType) *extensions.ReplicaSetCondition {
|
||||
for i := range status.Conditions {
|
||||
c := status.Conditions[i]
|
||||
if c.Type == condType {
|
||||
return &c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetCondition adds/replaces the given condition in the replica set status. If the condition that we
|
||||
// are about to add already exists and has the same status and reason then we are not going to update.
|
||||
func SetCondition(status *extensions.ReplicaSetStatus, condition extensions.ReplicaSetCondition) {
|
||||
currentCond := GetCondition(*status, condition.Type)
|
||||
if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
|
||||
return
|
||||
}
|
||||
newConditions := filterOutCondition(status.Conditions, condition.Type)
|
||||
status.Conditions = append(newConditions, condition)
|
||||
}
|
||||
|
||||
// RemoveCondition removes the condition with the provided type from the replica set status.
|
||||
func RemoveCondition(status *extensions.ReplicaSetStatus, condType extensions.ReplicaSetConditionType) {
|
||||
status.Conditions = filterOutCondition(status.Conditions, condType)
|
||||
}
|
||||
|
||||
// filterOutCondition returns a new slice of replica set conditions without conditions with the provided type.
|
||||
func filterOutCondition(conditions []extensions.ReplicaSetCondition, condType extensions.ReplicaSetConditionType) []extensions.ReplicaSetCondition {
|
||||
var newConditions []extensions.ReplicaSetCondition
|
||||
for _, c := range conditions {
|
||||
if c.Type == condType {
|
||||
continue
|
||||
}
|
||||
newConditions = append(newConditions, c)
|
||||
}
|
||||
return newConditions
|
||||
}
|
||||
|
@ -710,36 +710,10 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
|
||||
}
|
||||
trace.Step("manageReplicas done")
|
||||
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
// template of the replication controller, the matching pods may have more
|
||||
// labels than are in the template. Because the label of podTemplateSpec is
|
||||
// a superset of the selector of the replication controller, so the possible
|
||||
// matching pods must be part of the filteredPods.
|
||||
fullyLabeledReplicasCount := 0
|
||||
readyReplicasCount := 0
|
||||
availableReplicasCount := 0
|
||||
templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelectorPreValidated()
|
||||
for _, pod := range filteredPods {
|
||||
if templateLabel.Matches(labels.Set(pod.Labels)) {
|
||||
fullyLabeledReplicasCount++
|
||||
}
|
||||
if api.IsPodReady(pod) {
|
||||
readyReplicasCount++
|
||||
if api.IsPodAvailable(pod, rc.Spec.MinReadySeconds, unversioned.Now()) {
|
||||
availableReplicasCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
newStatus := calculateStatus(rc, filteredPods, manageReplicasErr)
|
||||
|
||||
// Always updates status as pods come up or die.
|
||||
if err := updateReplicaCount(
|
||||
rm.kubeClient.Core().ReplicationControllers(rc.Namespace),
|
||||
rc,
|
||||
len(filteredPods),
|
||||
fullyLabeledReplicasCount,
|
||||
readyReplicasCount,
|
||||
availableReplicasCount,
|
||||
); err != nil {
|
||||
if err := updateReplicationControllerStatus(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, newStatus); err != nil {
|
||||
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
|
||||
return err
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -326,6 +327,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
|
||||
fakeHandler := utiltesting.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: "{}",
|
||||
T: t,
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
defer testServer.Close()
|
||||
@ -378,9 +380,9 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
|
||||
manager.syncReplicationController(getKey(controllerSpec, t))
|
||||
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
|
||||
|
||||
// 1 PUT for the rc status during dormancy window.
|
||||
// 2 PUT for the rc status during dormancy window.
|
||||
// Note that the pod creates go through pod control so they're not recorded.
|
||||
fakeHandler.ValidateRequestCount(t, 1)
|
||||
fakeHandler.ValidateRequestCount(t, 2)
|
||||
}
|
||||
|
||||
func TestPodControllerLookup(t *testing.T) {
|
||||
@ -641,8 +643,9 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
return true, &api.ReplicationController{}, fmt.Errorf("Fake error")
|
||||
})
|
||||
fakeRCClient := c.Core().ReplicationControllers("default")
|
||||
numReplicas := 10
|
||||
updateReplicaCount(fakeRCClient, *rc, numReplicas, 0, 0, 0)
|
||||
numReplicas := int32(10)
|
||||
status := api.ReplicationControllerStatus{Replicas: numReplicas}
|
||||
updateReplicationControllerStatus(fakeRCClient, *rc, status)
|
||||
updates, gets := 0, 0
|
||||
for _, a := range c.Actions() {
|
||||
if a.GetResource().Resource != "replicationcontrollers" {
|
||||
@ -663,7 +666,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
|
||||
// returned an rc with replicas=1.
|
||||
if c, ok := action.GetObject().(*api.ReplicationController); !ok {
|
||||
t.Errorf("Expected an rc as the argument to update, got %T", c)
|
||||
} else if c.Status.Replicas != int32(numReplicas) {
|
||||
} else if c.Status.Replicas != numReplicas {
|
||||
t.Errorf("Expected update for rc to contain replicas %v, got %v instead",
|
||||
numReplicas, c.Status.Replicas)
|
||||
}
|
||||
@ -1417,3 +1420,163 @@ func TestAvailableReplicas(t *testing.T) {
|
||||
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
|
||||
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
|
||||
}
|
||||
|
||||
var (
|
||||
imagePullBackOff api.ReplicationControllerConditionType = "ImagePullBackOff"
|
||||
|
||||
condImagePullBackOff = func() api.ReplicationControllerCondition {
|
||||
return api.ReplicationControllerCondition{
|
||||
Type: imagePullBackOff,
|
||||
Status: api.ConditionTrue,
|
||||
Reason: "NonExistentImage",
|
||||
}
|
||||
}
|
||||
|
||||
condReplicaFailure = func() api.ReplicationControllerCondition {
|
||||
return api.ReplicationControllerCondition{
|
||||
Type: api.ReplicationControllerReplicaFailure,
|
||||
Status: api.ConditionTrue,
|
||||
Reason: "OtherFailure",
|
||||
}
|
||||
}
|
||||
|
||||
condReplicaFailure2 = func() api.ReplicationControllerCondition {
|
||||
return api.ReplicationControllerCondition{
|
||||
Type: api.ReplicationControllerReplicaFailure,
|
||||
Status: api.ConditionTrue,
|
||||
Reason: "AnotherFailure",
|
||||
}
|
||||
}
|
||||
|
||||
status = func() *api.ReplicationControllerStatus {
|
||||
return &api.ReplicationControllerStatus{
|
||||
Conditions: []api.ReplicationControllerCondition{condReplicaFailure()},
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
func TestGetCondition(t *testing.T) {
|
||||
exampleStatus := status()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
status api.ReplicationControllerStatus
|
||||
condType api.ReplicationControllerConditionType
|
||||
condStatus api.ConditionStatus
|
||||
condReason string
|
||||
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "condition exists",
|
||||
|
||||
status: *exampleStatus,
|
||||
condType: api.ReplicationControllerReplicaFailure,
|
||||
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "condition does not exist",
|
||||
|
||||
status: *exampleStatus,
|
||||
condType: imagePullBackOff,
|
||||
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
cond := GetCondition(test.status, test.condType)
|
||||
exists := cond != nil
|
||||
if exists != test.expected {
|
||||
t.Errorf("%s: expected condition to exist: %t, got: %t", test.name, test.expected, exists)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetCondition(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
status *api.ReplicationControllerStatus
|
||||
cond api.ReplicationControllerCondition
|
||||
|
||||
expectedStatus *api.ReplicationControllerStatus
|
||||
}{
|
||||
{
|
||||
name: "set for the first time",
|
||||
|
||||
status: &api.ReplicationControllerStatus{},
|
||||
cond: condReplicaFailure(),
|
||||
|
||||
expectedStatus: &api.ReplicationControllerStatus{Conditions: []api.ReplicationControllerCondition{condReplicaFailure()}},
|
||||
},
|
||||
{
|
||||
name: "simple set",
|
||||
|
||||
status: &api.ReplicationControllerStatus{Conditions: []api.ReplicationControllerCondition{condImagePullBackOff()}},
|
||||
cond: condReplicaFailure(),
|
||||
|
||||
expectedStatus: &api.ReplicationControllerStatus{Conditions: []api.ReplicationControllerCondition{condImagePullBackOff(), condReplicaFailure()}},
|
||||
},
|
||||
{
|
||||
name: "overwrite",
|
||||
|
||||
status: &api.ReplicationControllerStatus{Conditions: []api.ReplicationControllerCondition{condReplicaFailure()}},
|
||||
cond: condReplicaFailure2(),
|
||||
|
||||
expectedStatus: &api.ReplicationControllerStatus{Conditions: []api.ReplicationControllerCondition{condReplicaFailure2()}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
SetCondition(test.status, test.cond)
|
||||
if !reflect.DeepEqual(test.status, test.expectedStatus) {
|
||||
t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveCondition(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
status *api.ReplicationControllerStatus
|
||||
condType api.ReplicationControllerConditionType
|
||||
|
||||
expectedStatus *api.ReplicationControllerStatus
|
||||
}{
|
||||
{
|
||||
name: "remove from empty status",
|
||||
|
||||
status: &api.ReplicationControllerStatus{},
|
||||
condType: api.ReplicationControllerReplicaFailure,
|
||||
|
||||
expectedStatus: &api.ReplicationControllerStatus{},
|
||||
},
|
||||
{
|
||||
name: "simple remove",
|
||||
|
||||
status: &api.ReplicationControllerStatus{Conditions: []api.ReplicationControllerCondition{condReplicaFailure()}},
|
||||
condType: api.ReplicationControllerReplicaFailure,
|
||||
|
||||
expectedStatus: &api.ReplicationControllerStatus{},
|
||||
},
|
||||
{
|
||||
name: "doesn't remove anything",
|
||||
|
||||
status: status(),
|
||||
condType: imagePullBackOff,
|
||||
|
||||
expectedStatus: status(),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
RemoveCondition(test.status, test.condType)
|
||||
if !reflect.DeepEqual(test.status, test.expectedStatus) {
|
||||
t.Errorf("%s: expected status: %v, got: %v", test.name, test.expectedStatus, test.status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,52 +20,50 @@ package replication
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
|
||||
func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, controller api.ReplicationController, numReplicas, numFullyLabeledReplicas, numReadyReplicas, numAvailableReplicas int) (updateErr error) {
|
||||
// updateReplicationControllerStatus attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
|
||||
func updateReplicationControllerStatus(c unversionedcore.ReplicationControllerInterface, rc api.ReplicationController, newStatus api.ReplicationControllerStatus) (updateErr error) {
|
||||
// This is the steady state. It happens when the rc doesn't have any expectations, since
|
||||
// we do a periodic relist every 30s. If the generations differ but the replicas are
|
||||
// the same, a caller might've resized to the same replica count.
|
||||
if int(controller.Status.Replicas) == numReplicas &&
|
||||
int(controller.Status.FullyLabeledReplicas) == numFullyLabeledReplicas &&
|
||||
int(controller.Status.ReadyReplicas) == numReadyReplicas &&
|
||||
int(controller.Status.AvailableReplicas) == numAvailableReplicas &&
|
||||
controller.Generation == controller.Status.ObservedGeneration {
|
||||
if rc.Status.Replicas == newStatus.Replicas &&
|
||||
rc.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
|
||||
rc.Status.ReadyReplicas == newStatus.ReadyReplicas &&
|
||||
rc.Status.AvailableReplicas == newStatus.AvailableReplicas &&
|
||||
rc.Generation == rc.Status.ObservedGeneration &&
|
||||
reflect.DeepEqual(rc.Status.Conditions, newStatus.Conditions) {
|
||||
return nil
|
||||
}
|
||||
// Save the generation number we acted on, otherwise we might wrongfully indicate
|
||||
// that we've seen a spec update when we retry.
|
||||
// TODO: This can clobber an update if we allow multiple agents to write to the
|
||||
// same status.
|
||||
generation := controller.Generation
|
||||
newStatus.ObservedGeneration = rc.Generation
|
||||
|
||||
var getErr error
|
||||
for i, rc := 0, &controller; ; i++ {
|
||||
glog.V(4).Infof(fmt.Sprintf("Updating replica count for rc: %s/%s, ", controller.Namespace, controller.Name) +
|
||||
fmt.Sprintf("replicas %d->%d (need %d), ", controller.Status.Replicas, numReplicas, controller.Spec.Replicas) +
|
||||
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", controller.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
|
||||
fmt.Sprintf("readyReplicas %d->%d, ", controller.Status.ReadyReplicas, numReadyReplicas) +
|
||||
fmt.Sprintf("availableReplicas %d->%d, ", controller.Status.AvailableReplicas, numAvailableReplicas) +
|
||||
fmt.Sprintf("sequence No: %v->%v", controller.Status.ObservedGeneration, generation))
|
||||
for i, rc := 0, &rc; ; i++ {
|
||||
glog.V(4).Infof(fmt.Sprintf("Updating replica count for rc: %s/%s, ", rc.Namespace, rc.Name) +
|
||||
fmt.Sprintf("replicas %d->%d (need %d), ", rc.Status.Replicas, newStatus.Replicas, rc.Spec.Replicas) +
|
||||
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rc.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
|
||||
fmt.Sprintf("readyReplicas %d->%d, ", rc.Status.ReadyReplicas, newStatus.ReadyReplicas) +
|
||||
fmt.Sprintf("availableReplicas %d->%d, ", rc.Status.AvailableReplicas, newStatus.AvailableReplicas) +
|
||||
fmt.Sprintf("sequence No: %v->%v", rc.Status.ObservedGeneration, newStatus.ObservedGeneration))
|
||||
|
||||
rc.Status = api.ReplicationControllerStatus{
|
||||
Replicas: int32(numReplicas),
|
||||
FullyLabeledReplicas: int32(numFullyLabeledReplicas),
|
||||
ReadyReplicas: int32(numReadyReplicas),
|
||||
AvailableReplicas: int32(numAvailableReplicas),
|
||||
ObservedGeneration: generation,
|
||||
}
|
||||
_, updateErr = rcClient.UpdateStatus(rc)
|
||||
rc.Status = newStatus
|
||||
_, updateErr = c.UpdateStatus(rc)
|
||||
if updateErr == nil || i >= statusUpdateRetries {
|
||||
return updateErr
|
||||
}
|
||||
// Update the controller with the latest resource version for the next poll
|
||||
if rc, getErr = rcClient.Get(controller.Name); getErr != nil {
|
||||
if rc, getErr = c.Get(rc.Name); getErr != nil {
|
||||
// If the GET fails we can't trust status.Replicas anymore. This error
|
||||
// is bound to be more interesting than the update failure.
|
||||
return getErr
|
||||
@ -85,3 +83,96 @@ func (o OverlappingControllers) Less(i, j int) bool {
|
||||
}
|
||||
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
|
||||
}
|
||||
|
||||
func calculateStatus(rc api.ReplicationController, filteredPods []*api.Pod, manageReplicasErr error) api.ReplicationControllerStatus {
|
||||
newStatus := rc.Status
|
||||
// Count the number of pods that have labels matching the labels of the pod
|
||||
// template of the replication controller, the matching pods may have more
|
||||
// labels than are in the template. Because the label of podTemplateSpec is
|
||||
// a superset of the selector of the replication controller, so the possible
|
||||
// matching pods must be part of the filteredPods.
|
||||
fullyLabeledReplicasCount := 0
|
||||
readyReplicasCount := 0
|
||||
availableReplicasCount := 0
|
||||
templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelectorPreValidated()
|
||||
for _, pod := range filteredPods {
|
||||
if templateLabel.Matches(labels.Set(pod.Labels)) {
|
||||
fullyLabeledReplicasCount++
|
||||
}
|
||||
if api.IsPodReady(pod) {
|
||||
readyReplicasCount++
|
||||
if api.IsPodAvailable(pod, rc.Spec.MinReadySeconds, unversioned.Now()) {
|
||||
availableReplicasCount++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
failureCond := GetCondition(rc.Status, api.ReplicationControllerReplicaFailure)
|
||||
if manageReplicasErr != nil && failureCond == nil {
|
||||
var reason string
|
||||
if diff := len(filteredPods) - int(rc.Spec.Replicas); diff < 0 {
|
||||
reason = "FailedCreate"
|
||||
} else if diff > 0 {
|
||||
reason = "FailedDelete"
|
||||
}
|
||||
cond := NewReplicationControllerCondition(api.ReplicationControllerReplicaFailure, api.ConditionTrue, reason, manageReplicasErr.Error())
|
||||
SetCondition(&newStatus, cond)
|
||||
} else if manageReplicasErr == nil && failureCond != nil {
|
||||
RemoveCondition(&newStatus, api.ReplicationControllerReplicaFailure)
|
||||
}
|
||||
|
||||
newStatus.Replicas = int32(len(filteredPods))
|
||||
newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
|
||||
newStatus.ReadyReplicas = int32(readyReplicasCount)
|
||||
newStatus.AvailableReplicas = int32(availableReplicasCount)
|
||||
return newStatus
|
||||
}
|
||||
|
||||
// NewReplicationControllerCondition creates a new replication controller condition.
|
||||
func NewReplicationControllerCondition(condType api.ReplicationControllerConditionType, status api.ConditionStatus, reason, msg string) api.ReplicationControllerCondition {
|
||||
return api.ReplicationControllerCondition{
|
||||
Type: condType,
|
||||
Status: status,
|
||||
LastTransitionTime: unversioned.Now(),
|
||||
Reason: reason,
|
||||
Message: msg,
|
||||
}
|
||||
}
|
||||
|
||||
// GetCondition returns a replication controller condition with the provided type if it exists.
|
||||
func GetCondition(status api.ReplicationControllerStatus, condType api.ReplicationControllerConditionType) *api.ReplicationControllerCondition {
|
||||
for i := range status.Conditions {
|
||||
c := status.Conditions[i]
|
||||
if c.Type == condType {
|
||||
return &c
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetCondition adds/replaces the given condition in the replication controller status.
|
||||
func SetCondition(status *api.ReplicationControllerStatus, condition api.ReplicationControllerCondition) {
|
||||
currentCond := GetCondition(*status, condition.Type)
|
||||
if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason {
|
||||
return
|
||||
}
|
||||
newConditions := filterOutCondition(status.Conditions, condition.Type)
|
||||
status.Conditions = append(newConditions, condition)
|
||||
}
|
||||
|
||||
// RemoveCondition removes the condition with the provided type from the replication controller status.
|
||||
func RemoveCondition(status *api.ReplicationControllerStatus, condType api.ReplicationControllerConditionType) {
|
||||
status.Conditions = filterOutCondition(status.Conditions, condType)
|
||||
}
|
||||
|
||||
// filterOutCondition returns a new slice of replication controller conditions without conditions with the provided type.
|
||||
func filterOutCondition(conditions []api.ReplicationControllerCondition, condType api.ReplicationControllerConditionType) []api.ReplicationControllerCondition {
|
||||
var newConditions []api.ReplicationControllerCondition
|
||||
for _, c := range conditions {
|
||||
if c.Type == condType {
|
||||
continue
|
||||
}
|
||||
newConditions = append(newConditions, c)
|
||||
}
|
||||
return newConditions
|
||||
}
|
||||
|
@ -88,6 +88,9 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ
|
||||
f.T.Logf("Received read error: %v", err)
|
||||
}
|
||||
f.RequestBody = string(bodyReceived)
|
||||
if f.T != nil {
|
||||
f.T.Logf("request body: %s", f.RequestBody)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FakeHandler) ValidateRequestCount(t TestInterface, count int) bool {
|
||||
|
@ -140,6 +140,8 @@ go_library(
|
||||
"//pkg/controller/endpoint:go_default_library",
|
||||
"//pkg/controller/job:go_default_library",
|
||||
"//pkg/controller/petset:go_default_library",
|
||||
"//pkg/controller/replicaset:go_default_library",
|
||||
"//pkg/controller/replication:go_default_library",
|
||||
"//pkg/fields:go_default_library",
|
||||
"//pkg/kubectl:go_default_library",
|
||||
"//pkg/kubectl/cmd/util:go_default_library",
|
||||
|
@ -99,33 +99,6 @@ var _ = framework.KubeDescribe("Deployment", func() {
|
||||
// See https://github.com/kubernetes/kubernetes/issues/29229
|
||||
})
|
||||
|
||||
func newRS(rsName string, replicas int32, rsPodLabels map[string]string, imageName string, image string) *extensions.ReplicaSet {
|
||||
zero := int64(0)
|
||||
return &extensions.ReplicaSet{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: rsName,
|
||||
},
|
||||
Spec: extensions.ReplicaSetSpec{
|
||||
Replicas: replicas,
|
||||
Selector: &unversioned.LabelSelector{MatchLabels: rsPodLabels},
|
||||
Template: api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: rsPodLabels,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
TerminationGracePeriodSeconds: &zero,
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: imageName,
|
||||
Image: image,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newDeployment(deploymentName string, replicas int32, podLabels map[string]string, imageName string, image string, strategyType extensions.DeploymentStrategyType, revisionHistoryLimit *int32) *extensions.Deployment {
|
||||
zero := int64(0)
|
||||
return &extensions.Deployment{
|
||||
|
@ -3232,7 +3232,8 @@ type updateDeploymentFunc func(d *extensions.Deployment)
|
||||
|
||||
func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateDeploymentFunc) (deployment *extensions.Deployment, err error) {
|
||||
deployments := c.Extensions().Deployments(namespace)
|
||||
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
||||
var updateErr error
|
||||
pollErr := wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
||||
if deployment, err = deployments.Get(name); err != nil {
|
||||
return false, err
|
||||
}
|
||||
@ -3242,9 +3243,63 @@ func UpdateDeploymentWithRetries(c clientset.Interface, namespace, name string,
|
||||
Logf("Updating deployment %s", name)
|
||||
return true, nil
|
||||
}
|
||||
updateErr = err
|
||||
return false, nil
|
||||
})
|
||||
return deployment, err
|
||||
if pollErr == wait.ErrWaitTimeout {
|
||||
pollErr = fmt.Errorf("couldn't apply the provided updated to deployment %q: %v", name, updateErr)
|
||||
}
|
||||
return deployment, pollErr
|
||||
}
|
||||
|
||||
type updateRsFunc func(d *extensions.ReplicaSet)
|
||||
|
||||
func UpdateReplicaSetWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateRsFunc) (*extensions.ReplicaSet, error) {
|
||||
var rs *extensions.ReplicaSet
|
||||
var updateErr error
|
||||
pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
||||
var err error
|
||||
if rs, err = c.Extensions().ReplicaSets(namespace).Get(name); err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rs)
|
||||
if rs, err = c.Extensions().ReplicaSets(namespace).Update(rs); err == nil {
|
||||
Logf("Updating replica set %q", name)
|
||||
return true, nil
|
||||
}
|
||||
updateErr = err
|
||||
return false, nil
|
||||
})
|
||||
if pollErr == wait.ErrWaitTimeout {
|
||||
pollErr = fmt.Errorf("couldn't apply the provided updated to replicaset %q: %v", name, updateErr)
|
||||
}
|
||||
return rs, pollErr
|
||||
}
|
||||
|
||||
type updateRcFunc func(d *api.ReplicationController)
|
||||
|
||||
func UpdateReplicationControllerWithRetries(c clientset.Interface, namespace, name string, applyUpdate updateRcFunc) (*api.ReplicationController, error) {
|
||||
var rc *api.ReplicationController
|
||||
var updateErr error
|
||||
pollErr := wait.PollImmediate(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
||||
var err error
|
||||
if rc, err = c.Core().ReplicationControllers(namespace).Get(name); err != nil {
|
||||
return false, err
|
||||
}
|
||||
// Apply the update, then attempt to push it to the apiserver.
|
||||
applyUpdate(rc)
|
||||
if rc, err = c.Core().ReplicationControllers(namespace).Update(rc); err == nil {
|
||||
Logf("Updating replication controller %q", name)
|
||||
return true, nil
|
||||
}
|
||||
updateErr = err
|
||||
return false, nil
|
||||
})
|
||||
if pollErr == wait.ErrWaitTimeout {
|
||||
pollErr = fmt.Errorf("couldn't apply the provided updated to rc %q: %v", name, updateErr)
|
||||
}
|
||||
return rc, pollErr
|
||||
}
|
||||
|
||||
// NodeAddresses returns the first address of the given type of each node.
|
||||
|
116
test/e2e/rc.go
116
test/e2e/rc.go
@ -21,6 +21,8 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/controller/replication"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/util/uuid"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
@ -43,8 +45,38 @@ var _ = framework.KubeDescribe("ReplicationController", func() {
|
||||
|
||||
ServeImageOrFail(f, "private", "b.gcr.io/k8s_authenticated_test/serve_hostname:v1.4")
|
||||
})
|
||||
|
||||
It("should surface a failure condition on a common issue like exceeded quota", func() {
|
||||
rcConditionCheck(f)
|
||||
})
|
||||
})
|
||||
|
||||
func newRC(rsName string, replicas int32, rcPodLabels map[string]string, imageName string, image string) *api.ReplicationController {
|
||||
zero := int64(0)
|
||||
return &api.ReplicationController{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: rsName,
|
||||
},
|
||||
Spec: api.ReplicationControllerSpec{
|
||||
Replicas: replicas,
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: rcPodLabels,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
TerminationGracePeriodSeconds: &zero,
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: imageName,
|
||||
Image: image,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// A basic test to check the deployment of an image using
|
||||
// a replication controller. The image serves its hostname
|
||||
// which is checked for each replica.
|
||||
@ -117,3 +149,87 @@ func ServeImageOrFail(f *framework.Framework, test string, image string) {
|
||||
framework.Failf("Did not get expected responses within the timeout period of %.2f seconds.", retryTimeout.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Create a quota restricting pods in the current namespace to 2.
|
||||
// 2. Create a replication controller that wants to run 3 pods.
|
||||
// 3. Check replication controller conditions for a ReplicaFailure condition.
|
||||
// 4. Relax quota or scale down the controller and observe the condition is gone.
|
||||
func rcConditionCheck(f *framework.Framework) {
|
||||
c := f.ClientSet
|
||||
namespace := f.Namespace.Name
|
||||
name := "condition-test"
|
||||
|
||||
By(fmt.Sprintf("Creating quota %q that allows only two pods to run in the current namespace", name))
|
||||
quota := newPodQuota(name, "2")
|
||||
_, err := c.Core().ResourceQuotas(namespace).Create(quota)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
quota, err = c.Core().ResourceQuotas(namespace).Get(name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
podQuota := quota.Status.Hard[api.ResourcePods]
|
||||
quantity := resource.MustParse("2")
|
||||
return (&podQuota).Cmp(quantity) == 0, nil
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("resource quota %q never synced", name)
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Creating rc %q that asks for more than the allowed pod quota", name))
|
||||
rc := newRC(name, 3, map[string]string{"name": name}, nginxImageName, nginxImage)
|
||||
rc, err = c.Core().ReplicationControllers(namespace).Create(rc)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Checking rc %q has the desired failure condition set", name))
|
||||
generation := rc.Generation
|
||||
conditions := rc.Status.Conditions
|
||||
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
rc, err = c.Core().ReplicationControllers(namespace).Get(name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if generation > rc.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
conditions = rc.Status.Conditions
|
||||
|
||||
cond := replication.GetCondition(rc.Status, api.ReplicationControllerReplicaFailure)
|
||||
return cond != nil, nil
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("rc manager never added the failure condition for rc %q: %#v", name, conditions)
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Scaling down rc %q to satisfy pod quota", name))
|
||||
rc, err = framework.UpdateReplicationControllerWithRetries(c, namespace, name, func(update *api.ReplicationController) {
|
||||
update.Spec.Replicas = 2
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Checking rc %q has no failure condition set", name))
|
||||
generation = rc.Generation
|
||||
conditions = rc.Status.Conditions
|
||||
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
rc, err = c.Core().ReplicationControllers(namespace).Get(name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if generation > rc.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
conditions = rc.Status.Conditions
|
||||
|
||||
cond := replication.GetCondition(rc.Status, api.ReplicationControllerReplicaFailure)
|
||||
return cond == nil, nil
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("rc manager never removed the failure condition for rc %q: %#v", name, conditions)
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
@ -21,8 +21,10 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/controller/replicaset"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/util/uuid"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
@ -32,6 +34,45 @@ import (
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func newRS(rsName string, replicas int32, rsPodLabels map[string]string, imageName string, image string) *extensions.ReplicaSet {
|
||||
zero := int64(0)
|
||||
return &extensions.ReplicaSet{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: rsName,
|
||||
},
|
||||
Spec: extensions.ReplicaSetSpec{
|
||||
Replicas: replicas,
|
||||
Template: api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: rsPodLabels,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
TerminationGracePeriodSeconds: &zero,
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: imageName,
|
||||
Image: image,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newPodQuota(name, number string) *api.ResourceQuota {
|
||||
return &api.ResourceQuota{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
},
|
||||
Spec: api.ResourceQuotaSpec{
|
||||
Hard: api.ResourceList{
|
||||
api.ResourcePods: resource.MustParse(number),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
var _ = framework.KubeDescribe("ReplicaSet", func() {
|
||||
f := framework.NewDefaultFramework("replicaset")
|
||||
|
||||
@ -45,6 +86,10 @@ var _ = framework.KubeDescribe("ReplicaSet", func() {
|
||||
|
||||
ReplicaSetServeImageOrFail(f, "private", "b.gcr.io/k8s_authenticated_test/serve_hostname:v1.4")
|
||||
})
|
||||
|
||||
It("should surface a failure condition on a common issue like exceeded quota", func() {
|
||||
rsConditionCheck(f)
|
||||
})
|
||||
})
|
||||
|
||||
// A basic test to check the deployment of an image using a ReplicaSet. The
|
||||
@ -118,3 +163,88 @@ func ReplicaSetServeImageOrFail(f *framework.Framework, test string, image strin
|
||||
framework.Failf("Did not get expected responses within the timeout period of %.2f seconds.", retryTimeout.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
// 1. Create a quota restricting pods in the current namespace to 2.
|
||||
// 2. Create a replica set that wants to run 3 pods.
|
||||
// 3. Check replica set conditions for a ReplicaFailure condition.
|
||||
// 4. Scale down the replica set and observe the condition is gone.
|
||||
func rsConditionCheck(f *framework.Framework) {
|
||||
c := f.ClientSet
|
||||
namespace := f.Namespace.Name
|
||||
name := "condition-test"
|
||||
|
||||
By(fmt.Sprintf("Creating quota %q that allows only two pods to run in the current namespace", name))
|
||||
quota := newPodQuota(name, "2")
|
||||
_, err := c.Core().ResourceQuotas(namespace).Create(quota)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
quota, err = c.Core().ResourceQuotas(namespace).Get(name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
quantity := resource.MustParse("2")
|
||||
podQuota := quota.Status.Hard[api.ResourcePods]
|
||||
return (&podQuota).Cmp(quantity) == 0, nil
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("resource quota %q never synced", name)
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Creating replica set %q that asks for more than the allowed pod quota", name))
|
||||
rs := newRS(name, 3, map[string]string{"name": name}, nginxImageName, nginxImage)
|
||||
rs, err = c.Extensions().ReplicaSets(namespace).Create(rs)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Checking replica set %q has the desired failure condition set", name))
|
||||
generation := rs.Generation
|
||||
conditions := rs.Status.Conditions
|
||||
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
rs, err = c.Extensions().ReplicaSets(namespace).Get(name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if generation > rs.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
conditions = rs.Status.Conditions
|
||||
|
||||
cond := replicaset.GetCondition(rs.Status, extensions.ReplicaSetReplicaFailure)
|
||||
return cond != nil, nil
|
||||
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("rs controller never added the failure condition for replica set %q: %#v", name, conditions)
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Scaling down replica set %q to satisfy pod quota", name))
|
||||
rs, err = framework.UpdateReplicaSetWithRetries(c, namespace, name, func(update *extensions.ReplicaSet) {
|
||||
update.Spec.Replicas = 2
|
||||
})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
By(fmt.Sprintf("Checking replica set %q has no failure condition set", name))
|
||||
generation = rs.Generation
|
||||
conditions = rs.Status.Conditions
|
||||
err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) {
|
||||
rs, err = c.Extensions().ReplicaSets(namespace).Get(name)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if generation > rs.Status.ObservedGeneration {
|
||||
return false, nil
|
||||
}
|
||||
conditions = rs.Status.Conditions
|
||||
|
||||
cond := replicaset.GetCondition(rs.Status, extensions.ReplicaSetReplicaFailure)
|
||||
return cond == nil, nil
|
||||
})
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("rs controller never removed the failure condition for rs %q: %#v", name, conditions)
|
||||
}
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
}
|
||||
|
@ -341,8 +341,10 @@ Reboot each node by triggering kernel panic and ensure they function upon restar
|
||||
Redis should create and stop redis servers,timstclair,1
|
||||
ReplicaSet should serve a basic image on each replica with a private image,pmorie,1
|
||||
ReplicaSet should serve a basic image on each replica with a public image,krousey,0
|
||||
ReplicaSet should surface a failure condition on a common issue like exceeded quota,kargakis,0
|
||||
ReplicationController should serve a basic image on each replica with a private image,jbeda,1
|
||||
ReplicationController should serve a basic image on each replica with a public image,krousey,1
|
||||
ReplicationController should surface a failure condition on a common issue like exceeded quota,kargakis,0
|
||||
Rescheduler should ensure that critical pod is scheduled in case there is no resources available,mtaufen,1
|
||||
Resource-usage regular resource usage tracking resource tracking for * pods per node,janetkuo,1
|
||||
ResourceQuota should create a ResourceQuota and capture the life of a configMap.,timstclair,1
|
||||
|
|
Loading…
Reference in New Issue
Block a user