api: add readyReplicas in RC/RS

This commit is contained in:
Michail Kargakis 2016-08-17 18:18:16 +02:00
parent 4e571eafab
commit ab3bc03a4f
10 changed files with 143 additions and 16 deletions

View File

@ -1694,6 +1694,9 @@ type ReplicationControllerStatus struct {
// The number of pods that have labels matching the labels of the pod template of the replication controller.
FullyLabeledReplicas int32 `json:"fullyLabeledReplicas,omitempty"`
// The number of ready replicas for this replication controller.
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// ObservedGeneration is the most recent generation observed by the controller.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

View File

@ -2001,6 +2001,9 @@ type ReplicationControllerStatus struct {
// The number of pods that have labels matching the labels of the pod template of the replication controller.
FullyLabeledReplicas int32 `json:"fullyLabeledReplicas,omitempty" protobuf:"varint,2,opt,name=fullyLabeledReplicas"`
// The number of ready replicas for this replication controller.
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// ObservedGeneration reflects the generation of the most recently observed replication controller.
ObservedGeneration int64 `json:"observedGeneration,omitempty" protobuf:"varint,3,opt,name=observedGeneration"`
}

View File

@ -622,6 +622,9 @@ type ReplicaSetStatus struct {
// The number of pods that have labels matching the labels of the pod template of the replicaset.
FullyLabeledReplicas int32 `json:"fullyLabeledReplicas,omitempty"`
// The number of ready replicas for this replica set.
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// ObservedGeneration is the most recent generation observed by the controller.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

View File

@ -908,6 +908,9 @@ type ReplicaSetStatus struct {
// The number of pods that have labels matching the labels of the pod template of the replicaset.
FullyLabeledReplicas int32 `json:"fullyLabeledReplicas,omitempty" protobuf:"varint,2,opt,name=fullyLabeledReplicas"`
// The number of ready replicas for this replica set.
ReadyReplicas int32 `json:"readyReplicas,omitempty"`
// ObservedGeneration reflects the generation of the most recently observed ReplicaSet.
ObservedGeneration int64 `json:"observedGeneration,omitempty" protobuf:"varint,3,opt,name=observedGeneration"`
}

View File

@ -658,15 +658,19 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
// of the selector of the replicaset, so the possible matching pods must be
// part of the filteredPods.
fullyLabeledReplicasCount := 0
readyReplicasCount := 0
templateLabel := labels.Set(rs.Spec.Template.Labels).AsSelector()
for _, pod := range filteredPods {
if templateLabel.Matches(labels.Set(pod.Labels)) {
fullyLabeledReplicasCount++
}
if api.IsPodReady(pod) {
readyReplicasCount++
}
}
// Always updates status as pods come up or die.
if err := updateReplicaCount(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, len(filteredPods), fullyLabeledReplicasCount); err != nil {
if err := updateReplicaCount(rsc.kubeClient.Extensions().ReplicaSets(rs.Namespace), rs, len(filteredPods), fullyLabeledReplicasCount, readyReplicasCount); err != nil {
// Multiple things could lead to this update failing. Requeuing the replica set ensures
// we retry with some fairness.
glog.V(2).Infof("Failed to update replica count for controller %v/%v; requeuing; error: %v", rs.Namespace, rs.Name, err)

View File

@ -98,13 +98,17 @@ func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.Repl
// create a pod with the given phase for the given rs (same selectors and namespace)
func newPod(name string, rs *extensions.ReplicaSet, status api.PodPhase) *api.Pod {
var conditions []api.PodCondition
if status == api.PodRunning {
conditions = append(conditions, api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue})
}
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: rs.Namespace,
Labels: rs.Spec.Selector.MatchLabels,
},
Status: api.PodStatus{Phase: status},
Status: api.PodStatus{Phase: status, Conditions: conditions},
}
}
@ -249,7 +253,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(activePods, labelMap)
manager.rsStore.Store.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods)}
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, labelMap, rs, "pod")
fakePodControl := controller.FakePodControl{}
@ -294,7 +298,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
rs := newReplicaSet(5, labelMap)
rs.Spec.Template.Labels = extraLabelMap
manager.rsStore.Store.Add(rs)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0}
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, ObservedGeneration: 0}
rs.Generation = 1
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, extraLabelMap, rs, "podWithExtraLabel")
@ -312,7 +316,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// 2. Status.FullyLabeledReplicas should equal to the number of pods that
// has the extra labels, i.e., 2.
// 3. Every update to the status should include the Generation of the spec.
rs.Status = extensions.ReplicaSetStatus{Replicas: 4, FullyLabeledReplicas: 2, ObservedGeneration: 1}
rs.Status = extensions.ReplicaSetStatus{Replicas: 4, FullyLabeledReplicas: 2, ReadyReplicas: 4, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc)
@ -341,11 +345,13 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
// Creates a replica and sets expectations
rsSpec.Status.Replicas = 1
rsSpec.Status.ReadyReplicas = 1
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
// Expectations prevents replicas but not an update on status
rsSpec.Status.Replicas = 0
rsSpec.Status.ReadyReplicas = 0
fakePodControl.Clear()
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
@ -360,6 +366,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
// fakePodControl error will prevent this, leaving expectations at 0, 0
manager.expectations.CreationObserved(rsKey)
rsSpec.Status.Replicas = 1
rsSpec.Status.ReadyReplicas = 1
fakePodControl.Clear()
fakePodControl.Err = fmt.Errorf("Fake Error")
@ -655,7 +662,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
})
fakeRSClient := fakeClient.Extensions().ReplicaSets("default")
numReplicas := 10
updateReplicaCount(fakeRSClient, *rs, numReplicas, 0)
updateReplicaCount(fakeRSClient, *rs, numReplicas, 0, 0)
updates, gets := 0, 0
for _, a := range fakeClient.Actions() {
if a.GetResource().Resource != "replicasets" {
@ -1271,3 +1278,43 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
}
validateSyncReplicaSet(t, fakePodControl, 0, 0, 0)
}
func TestReadyReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicaSetControllerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1}
rs.Generation = 1
manager.rsStore.Store.Add(rs)
newPodList(manager.podStore.Indexer, 2, api.PodPending, labelMap, rs, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Extensions.Codec(), &extensions.ReplicaSet{})
fakeHandler.ResponseBody = response
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicaSet(getKey(rs, t))
// ReadyReplicas should go from 0 to 2.
rs.Status = extensions.ReplicaSetStatus{Replicas: 2, ReadyReplicas: 2, ObservedGeneration: 1}
decRs := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRs)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
}

View File

@ -27,12 +27,13 @@ import (
)
// updateReplicaCount attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas, numFullyLabeledReplicas int) (updateErr error) {
func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.ReplicaSet, numReplicas, numFullyLabeledReplicas, numReadyReplicas int) (updateErr error) {
// This is the steady state. It happens when the ReplicaSet doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if int(rs.Status.Replicas) == numReplicas &&
int(rs.Status.FullyLabeledReplicas) == numFullyLabeledReplicas &&
int(rs.Status.ReadyReplicas) == numReadyReplicas &&
rs.Generation == rs.Status.ObservedGeneration {
return nil
}
@ -47,9 +48,15 @@ func updateReplicaCount(rsClient client.ReplicaSetInterface, rs extensions.Repli
glog.V(4).Infof(fmt.Sprintf("Updating replica count for ReplicaSet: %s/%s, ", rs.Namespace, rs.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", rs.Status.Replicas, numReplicas, rs.Spec.Replicas) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", rs.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", rs.Status.ReadyReplicas, numReadyReplicas) +
fmt.Sprintf("sequence No: %v->%v", rs.Status.ObservedGeneration, generation))
rs.Status = extensions.ReplicaSetStatus{Replicas: int32(numReplicas), FullyLabeledReplicas: int32(numFullyLabeledReplicas), ObservedGeneration: generation}
rs.Status = extensions.ReplicaSetStatus{
Replicas: int32(numReplicas),
FullyLabeledReplicas: int32(numFullyLabeledReplicas),
ReadyReplicas: int32(numReadyReplicas),
ObservedGeneration: generation,
}
_, updateErr = rsClient.UpdateStatus(rs)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr

View File

@ -715,15 +715,19 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
// a superset of the selector of the replication controller, so the possible
// matching pods must be part of the filteredPods.
fullyLabeledReplicasCount := 0
readyReplicasCount := 0
templateLabel := labels.Set(rc.Spec.Template.Labels).AsSelector()
for _, pod := range filteredPods {
if templateLabel.Matches(labels.Set(pod.Labels)) {
fullyLabeledReplicasCount++
}
if api.IsPodReady(pod) {
readyReplicasCount++
}
}
// Always updates status as pods come up or die.
if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods), fullyLabeledReplicasCount); err != nil {
if err := updateReplicaCount(rm.kubeClient.Core().ReplicationControllers(rc.Namespace), rc, len(filteredPods), fullyLabeledReplicasCount, readyReplicasCount); err != nil {
// Multiple things could lead to this update failing. Returning an error causes a requeue without forcing a hotloop
return err
}

View File

@ -98,13 +98,17 @@ func newReplicationController(replicas int) *api.ReplicationController {
// create a pod with the given phase for the given rc (same selectors and namespace).
func newPod(name string, rc *api.ReplicationController, status api.PodPhase) *api.Pod {
var conditions []api.PodCondition
if status == api.PodRunning {
conditions = append(conditions, api.PodCondition{Type: api.PodReady, Status: api.ConditionTrue})
}
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
Labels: rc.Spec.Selector,
Namespace: rc.Namespace,
},
Status: api.PodStatus{Phase: status},
Status: api.PodStatus{Phase: status, Conditions: conditions},
}
}
@ -243,7 +247,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
activePods := 5
rc := newReplicationController(activePods)
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods)}
rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods), ReadyReplicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, rc, "pod")
fakePodControl := controller.FakePodControl{}
@ -284,7 +288,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(5)
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ObservedGeneration: 0}
rc.Status = api.ReplicationControllerStatus{Replicas: 2, FullyLabeledReplicas: 6, ReadyReplicas: 2, ObservedGeneration: 0}
rc.Generation = 1
newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
rcCopy := *rc
@ -305,7 +309,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// 2. Status.FullyLabeledReplicas should equal to the number of pods that
// has the extra labels, i.e., 2.
// 3. Every update to the status should include the Generation of the spec.
rc.Status = api.ReplicationControllerStatus{Replicas: 4, ObservedGeneration: 1}
rc.Status = api.ReplicationControllerStatus{Replicas: 4, ReadyReplicas: 4, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
@ -332,11 +336,13 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
// Creates a replica and sets expectations
controllerSpec.Status.Replicas = 1
controllerSpec.Status.ReadyReplicas = 1
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
// Expectations prevents replicas but not an update on status
controllerSpec.Status.Replicas = 0
controllerSpec.Status.ReadyReplicas = 0
fakePodControl.Clear()
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
@ -351,6 +357,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
// fakePodControl error will prevent this, leaving expectations at 0, 0.
manager.expectations.CreationObserved(rcKey)
controllerSpec.Status.Replicas = 1
controllerSpec.Status.ReadyReplicas = 1
fakePodControl.Clear()
fakePodControl.Err = fmt.Errorf("Fake Error")
@ -627,7 +634,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
})
fakeRCClient := c.Core().ReplicationControllers("default")
numReplicas := 10
updateReplicaCount(fakeRCClient, *rc, numReplicas, 0)
updateReplicaCount(fakeRCClient, *rc, numReplicas, 0, 0)
updates, gets := 0, 0
for _, a := range c.Actions() {
if a.GetResource().Resource != "replicationcontrollers" {
@ -1315,3 +1322,42 @@ func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
}
validateSyncReplication(t, fakePodControl, 0, 0, 0)
}
func TestReadyReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
c := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.podStoreSynced = alwaysReady
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(2)
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 0, ObservedGeneration: 1}
rc.Generation = 1
manager.rcStore.Indexer.Add(rc)
newPodList(manager.podStore.Indexer, 2, api.PodPending, rc, "pod")
newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
// This response body is just so we don't err out decoding the http response
response := runtime.EncodeOrDie(testapi.Default.Codec(), &api.ReplicationController{})
fakeHandler.ResponseBody = response
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(rc, t))
// ReadyReplicas should go from 0 to 2.
rc.Status = api.ReplicationControllerStatus{Replicas: 2, ReadyReplicas: 2, ObservedGeneration: 1}
decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
}

View File

@ -27,12 +27,13 @@ import (
)
// updateReplicaCount attempts to update the Status.Replicas of the given controller, with a single GET/PUT retry.
func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, controller api.ReplicationController, numReplicas, numFullyLabeledReplicas int) (updateErr error) {
func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface, controller api.ReplicationController, numReplicas, numFullyLabeledReplicas, numReadyReplicas int) (updateErr error) {
// This is the steady state. It happens when the rc doesn't have any expectations, since
// we do a periodic relist every 30s. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if int(controller.Status.Replicas) == numReplicas &&
int(controller.Status.FullyLabeledReplicas) == numFullyLabeledReplicas &&
int(controller.Status.ReadyReplicas) == numReadyReplicas &&
controller.Generation == controller.Status.ObservedGeneration {
return nil
}
@ -47,9 +48,15 @@ func updateReplicaCount(rcClient unversionedcore.ReplicationControllerInterface,
glog.V(4).Infof(fmt.Sprintf("Updating replica count for rc: %s/%s, ", controller.Namespace, controller.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", controller.Status.Replicas, numReplicas, controller.Spec.Replicas) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", controller.Status.FullyLabeledReplicas, numFullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", controller.Status.ReadyReplicas, numReadyReplicas) +
fmt.Sprintf("sequence No: %v->%v", controller.Status.ObservedGeneration, generation))
rc.Status = api.ReplicationControllerStatus{Replicas: int32(numReplicas), FullyLabeledReplicas: int32(numFullyLabeledReplicas), ObservedGeneration: generation}
rc.Status = api.ReplicationControllerStatus{
Replicas: int32(numReplicas),
FullyLabeledReplicas: int32(numFullyLabeledReplicas),
ReadyReplicas: int32(numReadyReplicas),
ObservedGeneration: generation,
}
_, updateErr = rcClient.UpdateStatus(rc)
if updateErr == nil || i >= statusUpdateRetries {
return updateErr