extensions: add minReadySeconds/availableReplicas to replica sets

This commit is contained in:
Michail Kargakis
2016-09-13 18:59:38 +02:00
parent c1e8c6d878
commit f7c232b8c6
21 changed files with 314 additions and 42 deletions

View File

@@ -640,6 +640,8 @@ func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 {
}
// IsPodAvailable return true if the pod is available.
// TODO: Remove this once we start using replica set status for calculating available pods
// for a deployment.
func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool {
if !controller.IsPodActive(pod) {
return false

View File

@@ -658,6 +658,7 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
// part of the filteredPods.
fullyLabeledReplicasCount := 0
readyReplicasCount := 0
availableReplicasCount := 0
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelectorPreValidated()
for _, pod := range filteredPods {
if templateLabel.Matches(labels.Set(pod.Labels)) {
@@ -665,11 +666,21 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
}
if api.IsPodReady(pod) {
readyReplicasCount++
if api.IsPodAvailable(pod, rs.Spec.MinReadySeconds, unversioned.Now()) {
availableReplicasCount++
}
}
}
// Always updates status as pods come up or die.
if err := updateReplicaCount(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, len(filteredPods), fullyLabeledReplicasCount, readyReplicasCount); err != nil {
if err := updateReplicaCount(
rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace),
rs,
len(filteredPods),
fullyLabeledReplicasCount,
readyReplicasCount,
availableReplicasCount,
); err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// we retry with some fairness.
glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rs.Namespace, rs.Name, err)

View File

@@ -97,10 +97,14 @@ func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.Repl
}
// create a pod with the given phase for the given rs (same selectors and namespace)
func newPod(name string, rs *extensions.ReplicaSet, status api.PodPhase) *api.Pod {
func newPod(name string, rs *extensions.ReplicaSet, status api.PodPhase, lastTransitionTime *unversioned.Time) *api.Pod {
var conditions []api.PodCondition
if status == api.PodRunning {
conditions = append(conditions, api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue})
condition := api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue}
if lastTransitionTime != nil {
condition.LastTransitionTime = *lastTransitionTime
}
conditions = append(conditions, condition)
}
return &api.Pod{
ObjectMeta: api.ObjectMeta{
@@ -118,7 +122,7 @@ func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[
var trueVar = true
controllerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
for i := 0; i < count; i++ {
pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status)
pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status, nil)
pod.ObjectMeta.Labels = labelMap
pod.OwnerReferences = []api.OwnerReference{controllerReference}
if store != nil {
@@ -253,7 +257,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(activePods, labelMap)
manager.rsStore.Store.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods)}
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod")
fakePodControl := controller.FakePodControl{}
@@ -298,7 +302,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
rs := newReplicaSet(5, labelMap)
rs.Spec.Template.Labels = extraLabelMap
manager.rsStore.Store.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, ObservedGeneration: 0}
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0}
rs.Generation = 1
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel")
@@ -316,7 +320,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// 2. Status.FullyLabeledReplicas should equal to the number of pods that
// has the extra labels, i.e., 2.
// 3. Every update to the status should include the Generation of the spec.
rs.Status = extensions.ReplicaSetStatus{Replicas: 4, FullyLabeledReplicas: 2, ReadyReplicas: 4, ObservedGeneration: 1}
rs.Status = extensions.ReplicaSetStatus{Replicas: 4, FullyLabeledReplicas: 2, ReadyReplicas: 4, AvailableReplicas: 4, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc)
@@ -346,12 +350,14 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
// Creates a replica and sets expectations
rsSpec.Status.Replicas = 1
rsSpec.Status.ReadyReplicas = 1
rsSpec.Status.AvailableReplicas = 1
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
// Expectations prevents replicas but not an update on status
rsSpec.Status.Replicas = 0
rsSpec.Status.ReadyReplicas = 0
rsSpec.Status.AvailableReplicas = 0
fakePodControl.Clear()
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
@@ -367,6 +373,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
manager.expectations.CreationObserved(rsKey)
rsSpec.Status.Replicas = 1
rsSpec.Status.ReadyReplicas = 1
rsSpec.Status.AvailableReplicas = 1
fakePodControl.Clear()
fakePodControl.Err = fmt.Errorf("Fake Error")
@@ -662,7 +669,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
})
fakeRSClient := fakeClient.Extensions().ReplicaSets("default")
numReplicas := 10
updateReplicaCount(fakeRSClient, *rs, numReplicas, 0, 0)
updateReplicaCount(fakeRSClient, *rs, numReplicas, 0, 0, 0)
updates, gets := 0, 0
for _, a := range fakeClient.Actions() {
if a.GetResource().Resource != "replicasets" {
@@ -1098,7 +1105,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
var trueVar = true
otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch.
pod := newPod("pod", rs, api.PodRunning)
pod := newPod("pod", rs, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{otherControllerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicaSet(getKey(rs, t))
@@ -1118,7 +1125,7 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
unrelatedOwnerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
pod := newPod("pod", rs, api.PodRunning)
pod := newPod("pod", rs, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference}
manager.podStore.Indexer.Add(pod)
@@ -1138,7 +1145,7 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
// add to podStore a matching pod that has an ownerRef pointing to the rs,
// but ownerRef.Controller is false. Expect a patch to take control it.
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name}
pod := newPod("pod", rs, api.PodRunning)
pod := newPod("pod", rs, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
manager.podStore.Indexer.Add(pod)
@@ -1157,8 +1164,8 @@ func TestPatchPodFails(t *testing.T) {
manager.rsStore.Store.Add(rs)
// add to podStore two matching pods. Expect two patches to take control
// them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil))
// let both patches fail. The rs controller will assume it fails to take
// control of the pods and create new ones.
fakePodControl.Err = fmt.Errorf("Fake Error")
@@ -1177,9 +1184,9 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
manager.rsStore.Store.Add(rs)
// add to podStore three matching pods. Expect three patches to take control
// them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning, nil))
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
@@ -1194,7 +1201,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
// put one pod in the podStore
pod := newPod("pod", rs, api.PodRunning)
pod := newPod("pod", rs, api.PodRunning, nil)
pod.ResourceVersion = "1"
var trueVar = true
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
@@ -1268,7 +1275,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
now := unversioned.Now()
rs.DeletionTimestamp = &now
manager.rsStore.Store.Add(rs)
pod1 := newPod("pod1", rs, api.PodRunning)
pod1 := newPod("pod1", rs, api.PodRunning, nil)
manager.podStore.Indexer.Add(pod1)
// no patch, no create
@@ -1295,7 +1302,7 @@ func TestReadyReplicas(t *testing.T) {
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1}
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1}
rs.Generation = 1
manager.rsStore.Store.Add(rs)
@@ -1312,7 +1319,56 @@ func TestReadyReplicas(t *testing.T) {
manager.syncReplicaSet(getKey(rs, t))
// ReadyReplicas should go from 0 to 2.
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 2, ObservedGeneration: 1}
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 1}
decRs := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRs)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
}
func TestAvailableReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1}
rs.Generation = 1
// minReadySeconds set to 15s
rs.Spec.MinReadySeconds = 15
manager.rsStore.Store.Add(rs)
// First pod becomes ready 20s ago
moment := unversioned.Time{Time: time.Now().Add(-2e10)}
pod := newPod("pod", rs, api.PodRunning, &moment)
manager.podStore.Indexer.Add(pod)
// Second pod becomes ready now
otherMoment := unversioned.Now()
otherPod := newPod("otherPod", rs, api.PodRunning, &otherMoment)
manager.podStore.Indexer.Add(otherPod)
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
fakeHandler.ResponseBody = response
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
// The controller should see only one available pod.
manager.syncReplicaSet(getKey(rs, t))
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 2, AvailableReplicas: 1, ObservedGeneration: 1}
decRs := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRs)

View File

@@ -27,13 +27,14 @@ import (
)
// updateReplicaCount attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas, numFullyLabeledReplicas, numReadyReplicas int) (updateErr error) {
func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas, numFullyLabeledReplicas, numReadyReplicas, numAvailableReplicas int) (updateErr error) {
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if int(rs.Status.Replicas) == numReplicas &&
int(rs.Status.FullyLabeledReplicas) == numFullyLabeledReplicas &&
int(rs.Status.ReadyReplicas) == numReadyReplicas &&
int(rs.Status.AvailableReplicas) == numAvailableReplicas &&
rs.Generation == rs.Status.ObservedGeneration {
return nil
}
@@ -49,12 +50,14 @@ func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.Repli
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, numReplicas, rs.Spec.Replicas) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, numReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", rs.Status.AvailableReplicas, numAvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, generation))
rs.Status = extensions.ReplicaSetStatus{
Replicas: int32(numReplicas),
FullyLabeledReplicas: int32(numFullyLabeledReplicas),
ReadyReplicas: int32(numReadyReplicas),
AvailableReplicas: int32(numAvailableReplicas),
ObservedGeneration: generation,
}
_, updateErr = rsClient.UpdateStatus(rs)

View File

@@ -717,6 +717,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
// matching pods must be part of the filteredPods.
fullyLabeledReplicasCount := 0
readyReplicasCount := 0
availableReplicasCount := 0
templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelectorPreValidated()
for _, pod := range filteredPods {
if templateLabel.Matches(labels.Set(pod.Labels)) {
@@ -724,11 +725,21 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
}
if api.IsPodReady(pod) {
readyReplicasCount++
if api.IsPodAvailable(pod, rc.Spec.MinReadySeconds, unversioned.Now()) {
availableReplicasCount++
}
}
}
// Always updates status as pods come up or die.
if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods), fullyLabeledReplicasCount, readyReplicasCount); err != nil {
if err := updateReplicaCount(
rm.kubeClient.Core().ReplicationControllers(rc.Namespace),
rc,
len(filteredPods),
fullyLabeledReplicasCount,
readyReplicasCount,
availableReplicasCount,
); err != nil {
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
}

View File

@@ -97,10 +97,14 @@ func newReplicationController(replicas int) *api.ReplicationController {
}
// create a pod with the given phase for the given rc (same selectors and namespace).
func newPod(name string, rc *api.ReplicationController, status api.PodPhase) *api.Pod {
func newPod(name string, rc *api.ReplicationController, status api.PodPhase, lastTransitionTime *unversioned.Time) *api.Pod {
var conditions []api.PodCondition
if status == api.PodRunning {
conditions = append(conditions, api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue})
condition := api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue}
if lastTransitionTime != nil {
condition.LastTransitionTime = *lastTransitionTime
}
conditions = append(conditions, condition)
}
return &api.Pod{
ObjectMeta: api.ObjectMeta{
@@ -118,7 +122,7 @@ func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.Repli
var trueVar = true
controllerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
for i := 0; i < count; i++ {
pod := newPod(fmt.Sprintf("%s%d", name, i), rc, status)
pod := newPod(fmt.Sprintf("%s%d", name, i), rc, status, nil)
pod.OwnerReferences = []api.OwnerReference{controllerReference}
if store != nil {
store.Add(pod)
@@ -247,7 +251,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
activePods := 5
rc := newReplicationController(activePods)
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods)}
rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods), AvailableReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, rc, "pod")
fakePodControl := controller.FakePodControl{}
@@ -288,7 +292,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(5)
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, ObservedGeneration: 0}
rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 0}
rc.Generation = 1
newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
rcCopy := *rc
@@ -309,7 +313,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// 2. Status.FullyLabeledReplicas should equal to the number of pods that
// has the extra labels, i.e., 2.
// 3. Every update to the status should include the Generation of the spec.
rc.Status = api.ReplicationControllerStatus{Replicas: 4, ReadyReplicas: 4, ObservedGeneration: 1}
rc.Status = api.ReplicationControllerStatus{Replicas: 4, ReadyReplicas: 4, AvailableReplicas: 4, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
@@ -337,12 +341,14 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
// Creates a replica and sets expectations
controllerSpec.Status.Replicas = 1
controllerSpec.Status.ReadyReplicas = 1
controllerSpec.Status.AvailableReplicas = 1
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
// Expectations prevents replicas but not an update on status
controllerSpec.Status.Replicas = 0
controllerSpec.Status.ReadyReplicas = 0
controllerSpec.Status.AvailableReplicas = 0
fakePodControl.Clear()
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
@@ -358,6 +364,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
manager.expectations.CreationObserved(rcKey)
controllerSpec.Status.Replicas = 1
controllerSpec.Status.ReadyReplicas = 1
controllerSpec.Status.AvailableReplicas = 1
fakePodControl.Clear()
fakePodControl.Err = fmt.Errorf("Fake Error")
@@ -634,7 +641,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
})
fakeRCClient := c.Core().ReplicationControllers("default")
numReplicas := 10
updateReplicaCount(fakeRCClient, *rc, numReplicas, 0, 0)
updateReplicaCount(fakeRCClient, *rc, numReplicas, 0, 0, 0)
updates, gets := 0, 0
for _, a := range c.Actions() {
if a.GetResource().Resource != "replicationcontrollers" {
@@ -1149,7 +1156,7 @@ func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
var trueVar = true
otherControllerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "v1", Kind: "ReplicationController", Name: "AnotherRC", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch.
pod := newPod("pod", rc, api.PodRunning)
pod := newPod("pod", rc, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{otherControllerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicationController(getKey(rc, t))
@@ -1168,7 +1175,7 @@ func TestPatchPodWithOtherOwnerRef(t *testing.T) {
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
unrelatedOwnerReference := api.OwnerReference{UID: uuid.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
pod := newPod("pod", rc, api.PodRunning)
pod := newPod("pod", rc, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference}
manager.podStore.Indexer.Add(pod)
@@ -1187,7 +1194,7 @@ func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
// add to podStore a matching pod that has an ownerRef pointing to the rc,
// but ownerRef.Controller is false. Expect a patch to take control it.
rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name}
pod := newPod("pod", rc, api.PodRunning)
pod := newPod("pod", rc, api.PodRunning, nil)
pod.OwnerReferences = []api.OwnerReference{rcOwnerReference}
manager.podStore.Indexer.Add(pod)
@@ -1205,8 +1212,8 @@ func TestPatchPodFails(t *testing.T) {
manager.rcStore.Indexer.Add(rc)
// add to podStore two matching pods. Expect two patches to take control
// them.
manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning, nil))
// let both patches fail. The rc manager will assume it fails to take
// control of the pods and create new ones.
fakePodControl.Err = fmt.Errorf("Fake Error")
@@ -1224,9 +1231,9 @@ func TestPatchExtraPodsThenDelete(t *testing.T) {
manager.rcStore.Indexer.Add(rc)
// add to podStore three matching pods. Expect three patches to take control
// them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod3", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning, nil))
manager.podStore.Indexer.Add(newPod("pod3", rc, api.PodRunning, nil))
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
@@ -1240,7 +1247,7 @@ func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
// put one pod in the podStore
pod := newPod("pod", rc, api.PodRunning)
pod := newPod("pod", rc, api.PodRunning, nil)
pod.ResourceVersion = "1"
var trueVar = true
rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
@@ -1312,7 +1319,7 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
now := unversioned.Now()
rc.DeletionTimestamp = &now
manager.rcStore.Indexer.Add(rc)
pod1 := newPod("pod1", rc, api.PodRunning)
pod1 := newPod("pod1", rc, api.PodRunning, nil)
manager.podStore.Indexer.Add(pod1)
// no patch, no create
@@ -1338,7 +1345,7 @@ func TestReadyReplicas(t *testing.T) {
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(2)
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1}
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, AvailableReplicas: 0, ObservedGeneration: 1}
rc.Generation = 1
manager.rcStore.Indexer.Add(rc)
@@ -1355,7 +1362,55 @@ func TestReadyReplicas(t *testing.T) {
manager.syncReplicationController(getKey(rc, t))
// ReadyReplicas should go from 0 to 2.
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 2, ObservedGeneration: 1}
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 2, AvailableReplicas: 2, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
}
func TestAvailableReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(2)
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1}
rc.Generation = 1
// minReadySeconds set to 15s
rc.Spec.MinReadySeconds = 15
manager.rcStore.Indexer.Add(rc)
// First pod becomes ready 20s ago
moment := unversioned.Time{Time: time.Now().Add(-2e10)}
pod := newPod("pod", rc, api.PodRunning, &moment)
manager.podStore.Indexer.Add(pod)
// Second pod becomes ready now
otherMoment := unversioned.Now()
otherPod := newPod("otherPod", rc, api.PodRunning, &otherMoment)
manager.podStore.Indexer.Add(otherPod)
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{})
fakeHandler.ResponseBody = response
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
// The controller should see only one available pod.
manager.syncReplicationController(getKey(rc, t))
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 2, AvailableReplicas: 1, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)

View File

@@ -27,13 +27,14 @@ import (
)
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, controller api.ReplicationController, numReplicas, numFullyLabeledReplicas, numReadyReplicas int) (updateErr error) {
func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, controller api.ReplicationController, numReplicas, numFullyLabeledReplicas, numReadyReplicas, numAvailableReplicas int) (updateErr error) {
// This is the steady state. It happens when the rc doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if int(controller.Status.Replicas) == numReplicas &&
int(controller.Status.FullyLabeledReplicas) == numFullyLabeledReplicas &&
int(controller.Status.ReadyReplicas) == numReadyReplicas &&
int(controller.Status.AvailableReplicas) == numAvailableReplicas &&
controller.Generation == controller.Status.ObservedGeneration {
return nil
}
@@ -49,12 +50,14 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface,
fmt.Sprintf("replicas %d->%d (need %d), ", controller.Status.Replicas, numReplicas, controller.Spec.Replicas) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", controller.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", controller.Status.ReadyReplicas, numReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", controller.Status.AvailableReplicas, numAvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", controller.Status.ObservedGeneration, generation))
rc.Status = api.ReplicationControllerStatus{
Replicas: int32(numReplicas),
FullyLabeledReplicas: int32(numFullyLabeledReplicas),
ReadyReplicas: int32(numReadyReplicas),
AvailableReplicas: int32(numAvailableReplicas),
ObservedGeneration: generation,
}
_, updateErr = rcClient.UpdateStatus(rc)