Merge pull request #29184 from wojtek-t/rs_controller_ref

Automatic merge from submit-queue

ReplicaSet controller can set/remove ControllerRef

This is mostly a copy from https://github.com/kubernetes/kubernetes/pull/27600
This commit is contained in:
k8s-merge-robot 2016-07-22 04:18:00 -07:00 committed by GitHub
commit 60e59c9461
6 changed files with 860 additions and 113 deletions

View File

@ -364,7 +364,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
if containsResource(resources, "replicasets") {
glog.Infof("Starting ReplicaSet controller")
go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS)).
go replicaset.NewReplicaSetController(podInformer, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas, int(s.LookupCacheSizeForRS), s.EnableGarbageCollector).
Run(int(s.ConcurrentRSSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
}

View File

@ -27,8 +27,10 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
@ -38,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -63,6 +66,10 @@ const (
statusUpdateRetries = 1
)
func getRSKind() unversioned.GroupVersionKind {
return v1beta1.SchemeGroupVersion.WithKind("ReplicaSet")
}
// ReplicaSetController is responsible for synchronizing ReplicaSet objects stored
// in the system with actual running pods.
type ReplicaSetController struct {
@ -101,21 +108,25 @@ type ReplicaSetController struct {
// Controllers that need to be synced
queue *workqueue.Type
// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
// manager behaves differently if GC is enabled.
garbageCollectorEnabled bool
}
// NewReplicaSetController creates a new ReplicaSetController.
func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
func NewReplicaSetController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicaSetController(
eventBroadcaster.NewRecorder(api.EventSource{Component: "replicaset-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}
// newReplicaSetController configures a replica set controller with the specified event recorder
func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
func newReplicaSetController(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicaSetController {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replicaset_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
@ -129,6 +140,7 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.New(),
garbageCollectorEnabled: garbageCollectorEnabled,
}
rsc.rsStore.Store, rsc.rsController = framework.NewInformer(
@ -145,7 +157,108 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra
FullControllerResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: rsc.enqueueReplicaSet,
UpdateFunc: func(old, cur interface{}) {
UpdateFunc: rsc.updateRS,
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podStore.Indexer = podInformer.GetIndexer()
rsc.podController = podInformer.GetController()
rsc.syncHandler = rsc.syncReplicaSet
rsc.podStoreSynced = rsc.podController.HasSynced
rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rsc
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
garbageCollectorEnabled := false
rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rsc.internalPodInformer = podInformer
return rsc
}
// SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder}
}
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go rsc.rsController.Run(stopCh)
go rsc.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
if rsc.internalPodInformer != nil {
go rsc.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down ReplicaSet Controller")
rsc.queue.ShutDown()
}
// getPodReplicaSet returns the replica set managing the given pod.
// TODO: Surface that we are ignoring multiple replica sets for a single pod.
// TODO: use ownerReference.Controller to determine if the rs controls the pod.
func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.ReplicaSet {
// look up in the cache, if cached and the cache is valid, just return cached value
if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached {
rs, ok := obj.(*extensions.ReplicaSet)
if !ok {
// This should not happen
glog.Errorf("lookup cache does not return a ReplicaSet object")
return nil
}
if cached && rsc.isCacheValid(pod, rs) {
return rs
}
}
// if not cached or cached value is invalid, search all the rs to find the matching one, and update cache
rss, err := rsc.rsStore.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name)
return nil
}
// In theory, overlapping ReplicaSets is user error. This sorting will not prevent
// oscillation of replicas in all cases, eg:
// rs1 (older rs): [(k1=v1)], replicas=1 rs2: [(k2=v2)], replicas=2
// pod: [(k1:v1), (k2:v2)] will wake both rs1 and rs2, and we will sync rs1.
// pod: [(k2:v2)] will wake rs2 which creates a new replica.
if len(rss) > 1 {
// More than two items in this list indicates user error. If two replicasets
// overlap, sort by creation timestamp, subsort by name, then pick
// the first.
glog.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)
sort.Sort(overlappingReplicaSets(rss))
}
// update lookup cache
rsc.lookupCache.Update(pod, &rss[0])
return &rss[0]
}
// callback when RS is updated
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
oldRS := old.(*extensions.ReplicaSet)
curRS := cur.(*extensions.ReplicaSet)
@ -180,102 +293,6 @@ func newReplicaSetController(eventRecorder record.EventRecorder, podInformer fra
glog.V(4).Infof("Observed updated replica count for ReplicaSet: %v, %d->%d", curRS.Name, oldRS.Status.Replicas, curRS.Status.Replicas)
}
rsc.enqueueReplicaSet(cur)
},
// This will enter the sync loop and no-op, because the replica set has been deleted from the store.
// Note that deleting a replica set immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the replica set.
DeleteFunc: rsc.enqueueReplicaSet,
},
)
podInformer.AddEventHandler(framework.ResourceEventHandlerFuncs{
AddFunc: rsc.addPod,
// This invokes the ReplicaSet for every pod change, eg: host assignment. Though this might seem like
// overkill the most frequent pod update is status, and the associated ReplicaSet will only list from
// local storage, so it should be ok.
UpdateFunc: rsc.updatePod,
DeleteFunc: rsc.deletePod,
})
rsc.podStore.Indexer = podInformer.GetIndexer()
rsc.podController = podInformer.GetController()
rsc.syncHandler = rsc.syncReplicaSet
rsc.podStoreSynced = rsc.podController.HasSynced
rsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return rsc
}
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicaSetControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicaSetController {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
rsc := NewReplicaSetController(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rsc.internalPodInformer = podInformer
return rsc
}
// SetEventRecorder replaces the event recorder used by the ReplicaSetController
// with the given recorder. Only used for testing.
func (rsc *ReplicaSetController) SetEventRecorder(recorder record.EventRecorder) {
// TODO: Hack. We can't cleanly shutdown the event recorder, so benchmarks
// need to pass in a fake.
rsc.podControl = controller.RealPodControl{KubeClient: rsc.kubeClient, Recorder: recorder}
}
// Run begins watching and syncing.
func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go rsc.rsController.Run(stopCh)
go rsc.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go wait.Until(rsc.worker, time.Second, stopCh)
}
if rsc.internalPodInformer != nil {
go rsc.internalPodInformer.Run(stopCh)
}
<-stopCh
glog.Infof("Shutting down ReplicaSet Controller")
rsc.queue.ShutDown()
}
// getPodReplicaSet returns the replica set managing the given pod.
// TODO: Surface that we are ignoring multiple replica sets for a single pod.
func (rsc *ReplicaSetController) getPodReplicaSet(pod *api.Pod) *extensions.ReplicaSet {
// look up in the cache, if cached and the cache is valid, just return cached value
if obj, cached := rsc.lookupCache.GetMatchingObject(pod); cached {
rs, ok := obj.(*extensions.ReplicaSet)
if !ok {
// This should not happen
glog.Errorf("lookup cache does not retuen a ReplicaSet object")
return nil
}
if cached && rsc.isCacheValid(pod, rs) {
return rs
}
}
// if not cached or cached value is invalid, search all the rs to find the matching one, and update cache
rss, err := rsc.rsStore.GetPodReplicaSets(pod)
if err != nil {
glog.V(4).Infof("No ReplicaSets found for pod %v, ReplicaSet controller will avoid syncing", pod.Name)
return nil
}
// In theory, overlapping ReplicaSets is user error. This sorting will not prevent
// oscillation of replicas in all cases, eg:
// rs1 (older rs): [(k1=v1)], replicas=1 rs2: [(k2=v2)], replicas=2
// pod: [(k1:v1), (k2:v2)] will wake both rs1 and rs2, and we will sync rs1.
// pod: [(k2:v2)] will wake rs2 which creates a new replica.
if len(rss) > 1 {
// More than two items in this list indicates user error. If two replicasets
// overlap, sort by creation timestamp, subsort by name, then pick
// the first.
glog.Errorf("user error! more than one ReplicaSet is selecting pods with labels: %+v", pod.Labels)
sort.Sort(overlappingReplicaSets(rss))
}
// update lookup cache
rsc.lookupCache.Update(pod, &rss[0])
return &rss[0]
}
// isCacheValid check if the cache is valid
@ -357,9 +374,7 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
return
}
if rs := rsc.getPodReplicaSet(curPod); rs != nil {
rsc.enqueueReplicaSet(rs)
}
// Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod.
if labelChanged {
// If the old and new ReplicaSet are the same, the first one that syncs
// will set expectations preventing any damage from the second.
@ -367,6 +382,10 @@ func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
rsc.enqueueReplicaSet(oldRS)
}
}
if curRS := rsc.getPodReplicaSet(curPod); curRS != nil {
rsc.enqueueReplicaSet(curRS)
}
}
// When a pod is deleted, enqueue the replica set that manages the pod and update its expectations.
@ -456,13 +475,28 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
// into a performance bottleneck. We should generate a UID for the pod
// beforehand and store it via ExpectCreations.
rsc.expectations.ExpectCreations(rsKey, diff)
wait := sync.WaitGroup{}
wait.Add(diff)
var wg sync.WaitGroup
wg.Add(diff)
glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", rs.Namespace, rs.Name, rs.Spec.Replicas, diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
if err := rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs); err != nil {
defer wg.Done()
var err error
if rsc.garbageCollectorEnabled {
var trueVar = true
controllerRef := &api.OwnerReference{
APIVersion: getRSKind().GroupVersion().String(),
Kind: getRSKind().Kind,
Name: rs.Name,
UID: rs.UID,
Controller: &trueVar,
}
err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef)
} else {
err = rsc.podControl.CreatePods(rs.Namespace, &rs.Spec.Template, rs)
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for replica set %q/%q", rs.Namespace, rs.Name)
rsc.expectations.CreationObserved(rsKey)
@ -470,7 +504,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
}
}()
}
wait.Wait()
wg.Wait()
} else if diff > 0 {
if diff > rsc.burstReplicas {
diff = rsc.burstReplicas
@ -494,11 +528,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
deletedPodKeys = append(deletedPodKeys, controller.PodKey(filteredPods[i]))
}
rsc.expectations.ExpectDeletions(rsKey, deletedPodKeys)
wait := sync.WaitGroup{}
wait.Add(diff)
var wg sync.WaitGroup
wg.Add(diff)
for i := 0; i < diff; i++ {
go func(ix int) {
defer wait.Done()
defer wg.Done()
if err := rsc.podControl.DeletePod(rs.Namespace, filteredPods[ix].Name, rs); err != nil {
// Decrement the expected number of deletes because the informer won't observe this deletion
podKey := controller.PodKey(filteredPods[ix])
@ -508,7 +542,7 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*api.Pod, rs *ext
}
}(i)
}
wait.Wait()
wg.Wait()
}
}
@ -557,16 +591,60 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
glog.Errorf("Error converting pod selector to selector: %v", err)
return err
}
podList, err := rsc.podStore.Pods(rs.Namespace).List(selector)
// TODO: Do the List and Filter in a single pass, or use an index.
var filteredPods []*api.Pod
if rsc.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rs`s selector
// anymore but has the stale controller ref.
podList, err := rsc.podStore.Pods(rs.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pods for ReplicaSet %q: %v", key, err)
glog.Errorf("Error getting pods for rs %q: %v", key, err)
rsc.queue.Add(key)
return err
}
cm := controller.NewPodControllerRefManager(rsc.podControl, rs.ObjectMeta, selector, getRSKind())
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items)
for _, pod := range matchesNeedsController {
err := cm.AdoptPod(pod)
// continue to next pod if adoption fails.
if err != nil {
// If the pod no longer exists, don't even log the error.
if !errors.IsNotFound(err) {
utilruntime.HandleError(err)
}
} else {
matchesAndControlled = append(matchesAndControlled, pod)
}
}
filteredPods = matchesAndControlled
// remove the controllerRef for the pods that no longer have matching labels
var errlist []error
for _, pod := range controlledDoesNotMatch {
err := cm.ReleasePod(pod)
if err != nil {
errlist = append(errlist, err)
}
}
if len(errlist) != 0 {
aggregate := utilerrors.NewAggregate(errlist)
// push the RS into work queue again. We need to try to free the
// pods again otherwise they will stuck with the stale
// controllerRef.
rsc.queue.Add(key)
return aggregate
}
} else {
podList, err := rsc.podStore.Pods(rs.Namespace).List(selector)
if err != nil {
glog.Errorf("Error getting pods for rs %q: %v", key, err)
rsc.queue.Add(key)
return err
}
filteredPods = controller.FilterActivePods(podList.Items)
}
// TODO: Do this in a single pass, or use an index.
filteredPods := controller.FilterActivePods(podList.Items)
if rsNeedsSync {
if rsNeedsSync && rs.DeletionTimestamp == nil {
rsc.manageReplicas(filteredPods, &rs)
}

View File

@ -96,34 +96,46 @@ func newReplicaSet(replicas int, selectorMap map[string]string) *extensions.Repl
return rs
}
// create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store.
func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet, name string) *api.PodList {
pods := []api.Pod{}
for i := 0; i < count; i++ {
newPod := api.Pod{
// create a pod with the given phase for the given rs (same selectors and namespace)
func newPod(name string, rs *extensions.ReplicaSet, status api.PodPhase) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s%d", name, i),
Labels: labelMap,
Name: name,
Namespace: rs.Namespace,
Labels: rs.Spec.Selector.MatchLabels,
},
Status: api.PodStatus{Phase: status},
}
if store != nil {
store.Add(&newPod)
}
pods = append(pods, newPod)
// create count pods with the given phase for the given ReplicaSet (same selectors and namespace), and add them to the store.
func newPodList(store cache.Store, count int, status api.PodPhase, labelMap map[string]string, rs *extensions.ReplicaSet, name string) *api.PodList {
pods := []api.Pod{}
var trueVar = true
controllerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
for i := 0; i < count; i++ {
pod := newPod(fmt.Sprintf("%s%d", name, i), rs, status)
pod.ObjectMeta.Labels = labelMap
pod.OwnerReferences = []api.OwnerReference{controllerReference}
if store != nil {
store.Add(pod)
}
pods = append(pods, *pod)
}
return &api.PodList{
Items: pods,
}
}
func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes int) {
if len(fakePodControl.Templates) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates))
func validateSyncReplicaSet(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) {
if e, a := expectedCreates, len(fakePodControl.Templates); e != a {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a)
}
if len(fakePodControl.DeletePodName) != expectedDeletes {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName))
if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a)
}
if e, a := expectedPatches, len(fakePodControl.Patches); e != a {
t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a)
}
}
@ -150,7 +162,7 @@ func TestSyncReplicaSetDoesNothing(t *testing.T) {
manager.podControl = &fakePodControl
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
}
func TestSyncReplicaSetDeletes(t *testing.T) {
@ -167,7 +179,7 @@ func TestSyncReplicaSetDeletes(t *testing.T) {
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rsSpec, "pod")
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 1)
validateSyncReplicaSet(t, &fakePodControl, 0, 1, 0)
}
func TestDeleteFinalStateUnknown(t *testing.T) {
@ -217,7 +229,7 @@ func TestSyncReplicaSetCreates(t *testing.T) {
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicaSet(getKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 2, 0)
validateSyncReplicaSet(t, &fakePodControl, 2, 0, 0)
}
func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
@ -244,7 +256,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
manager.podControl = &fakePodControl
manager.syncReplicaSet(getKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
if fakeHandler.RequestReceived != nil {
t.Errorf("Unexpected update when pods and ReplicaSets are in a steady state")
}
@ -304,7 +316,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
decRc := runtime.EncodeOrDie(testapi.Extensions.Codec(), rs)
fakeHandler.ValidateRequest(t, testapi.Extensions.ResourcePath(replicaSetResourceName(), rs.Namespace, rs.Name)+"/status", "PUT", &decRc)
validateSyncReplicaSet(t, &fakePodControl, 1, 0)
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
}
func TestSyncReplicaSetDormancy(t *testing.T) {
@ -330,13 +342,13 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
// Creates a replica and sets expectations
rsSpec.Status.Replicas = 1
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0)
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
// Expectations prevents replicas but not an update on status
rsSpec.Status.Replicas = 0
fakePodControl.Clear()
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
// Get the key for the controller
rsKey, err := controller.KeyFunc(rsSpec)
@ -352,13 +364,13 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
fakePodControl.Err = fmt.Errorf("Fake Error")
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0)
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
// This replica should not need a Lowering of expectations, since the previous create failed
fakePodControl.Clear()
fakePodControl.Err = nil
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0)
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
// 1 PUT for the ReplicaSet status during dormancy window.
// Note that the pod creates go through pod control so they're not recorded.
@ -716,7 +728,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
expectedPods = int32(burstReplicas)
}
// This validates the ReplicaSet manager sync actually created pods
validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0)
validateSyncReplicaSet(t, &fakePodControl, int(expectedPods), 0, 0)
// This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas.
@ -727,7 +739,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
podExp, exists, err := manager.expectations.GetExpectations(rsKey)
if !exists || err != nil {
t.Fatalf("Did not find expectations for rc.")
t.Fatalf("Did not find expectations for rs.")
}
if add, _ := podExp.GetExpectations(); add != 1 {
t.Fatalf("Expectations are wrong %v", podExp)
@ -737,7 +749,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
if expectedPods > int32(burstReplicas) {
expectedPods = int32(burstReplicas)
}
validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods))
validateSyncReplicaSet(t, &fakePodControl, 0, int(expectedPods), 0)
// To accurately simulate a watch we must delete the exact pods
// the rs is waiting for.
@ -772,7 +784,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// Check that the ReplicaSet didn't take any action for all the above pods
fakePodControl.Clear()
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
// Create/Delete the last pod
// The last add pod will decrease the expectation of the ReplicaSet to 0,
@ -851,7 +863,7 @@ func TestRSSyncExpectations(t *testing.T) {
},
})
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
}
func TestDeleteControllerAndExpectations(t *testing.T) {
@ -867,7 +879,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should set expectations for the ReplicaSet
manager.syncReplicaSet(getKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0)
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
fakePodControl.Clear()
// Get the ReplicaSet key
@ -893,7 +905,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
podExp.Add(-1, 0)
manager.podStore.Indexer.Replace(make([]interface{}, 0), "0")
manager.syncReplicaSet(getKey(rs, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
}
func TestRSManagerNotReady(t *testing.T) {
@ -911,7 +923,7 @@ func TestRSManagerNotReady(t *testing.T) {
rsKey := getKey(rsSpec, t)
manager.syncReplicaSet(rsKey)
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 0, 0, 0)
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
@ -919,7 +931,7 @@ func TestRSManagerNotReady(t *testing.T) {
manager.podStoreSynced = alwaysReady
manager.syncReplicaSet(rsKey)
validateSyncReplicaSet(t, &fakePodControl, 1, 0)
validateSyncReplicaSet(t, &fakePodControl, 1, 0, 0)
}
// shuffle returns a new shuffled list of container controllers.
@ -984,9 +996,9 @@ func TestDeletionTimestamp(t *testing.T) {
// A pod added with a deletion timestamp should decrement deletions, not creations.
manager.addPod(&pod)
queueRC, _ := manager.queue.Get()
if queueRC != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC)
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
manager.queue.Done(rsKey)
@ -1001,9 +1013,9 @@ func TestDeletionTimestamp(t *testing.T) {
manager.expectations.ExpectDeletions(rsKey, []string{controller.PodKey(&pod)})
manager.updatePod(&oldPod, &pod)
queueRC, _ = manager.queue.Get()
if queueRC != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC)
queueRS, _ = manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
manager.queue.Done(rsKey)
@ -1041,9 +1053,9 @@ func TestDeletionTimestamp(t *testing.T) {
// Deleting the second pod should clear expectations.
manager.deletePod(secondPod)
queueRC, _ = manager.queue.Get()
if queueRC != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRC)
queueRS, _ = manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
manager.queue.Done(rsKey)
@ -1052,3 +1064,202 @@ func TestDeletionTimestamp(t *testing.T) {
t.Fatalf("Wrong expectations %+v", podExp)
}
}
// setupManagerWithGCEnabled creates a RS manager with a fakePodControl
// and with garbageCollectorEnabled set to true
func setupManagerWithGCEnabled() (manager *ReplicaSetController, fakePodControl *controller.FakePodControl) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl = &controller.FakePodControl{}
manager = NewReplicaSetControllerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.garbageCollectorEnabled = true
manager.podStoreSynced = alwaysReady
manager.podControl = fakePodControl
return manager, fakePodControl
}
func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
var trueVar = true
otherControllerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "v1beta1", Kind: "ReplicaSet", Name: "AnotherRS", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch.
pod := newPod("pod", rs, api.PodRunning)
pod.OwnerReferences = []api.OwnerReference{otherControllerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
}
// because the matching pod already has a controller, so 2 pods should be created.
validateSyncReplicaSet(t, fakePodControl, 2, 0, 0)
}
func TestPatchPodWithOtherOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
// add to podStore one more matching pod that doesn't have a controller
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
unrelatedOwnerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
pod := newPod("pod", rs, api.PodRunning)
pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
}
// 1 patch to take control of pod, and 1 create of new pod.
validateSyncReplicaSet(t, fakePodControl, 1, 0, 1)
}
func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
// add to podStore a matching pod that has an ownerRef pointing to the rs,
// but ownerRef.Controller is false. Expect a patch to take control it.
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1", Kind: "ReplicaSet", Name: rs.Name}
pod := newPod("pod", rs, api.PodRunning)
pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
}
// 1 patch to take control of pod, and 1 create of new pod.
validateSyncReplicaSet(t, fakePodControl, 1, 0, 1)
}
func TestPatchPodFails(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
// add to podStore two matching pods. Expect two patches to take control
// them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning))
// let both patches fail. The rs controller will assume it fails to take
// control of the pods and create new ones.
fakePodControl.Err = fmt.Errorf("Fake Error")
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
}
// 2 patches to take control of pod1 and pod2 (both fail), 2 creates.
validateSyncReplicaSet(t, fakePodControl, 2, 0, 2)
}
func TestPatchExtraPodsThenDelete(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
// add to podStore three matching pods. Expect three patches to take control
// them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rs, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod3", rs, api.PodRunning))
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
}
// 3 patches to take control of the pods, and 1 deletion because there is an extra pod.
validateSyncReplicaSet(t, fakePodControl, 0, 1, 3)
}
func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
manager.rsStore.Store.Add(rs)
// put one pod in the podStore
pod := newPod("pod", rs, api.PodRunning)
var trueVar = true
rsOwnerReference := api.OwnerReference{UID: rs.UID, APIVersion: "v1beta1", Kind: "ReplicaSet", Name: rs.Name, Controller: &trueVar}
pod.OwnerReferences = []api.OwnerReference{rsOwnerReference}
updatedPod := *pod
// reset the labels
updatedPod.Labels = make(map[string]string)
// add the updatedPod to the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updatePod() in this case).
manager.podStore.Indexer.Add(&updatedPod)
// send a update of the same pod with modified labels
manager.updatePod(pod, &updatedPod)
// verifies that rs is added to the queue
rsKey := getKey(rs, t)
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
manager.queue.Done(queueRS)
err := manager.syncReplicaSet(rsKey)
if err != nil {
t.Fatal(err)
}
// expect 1 patch to be sent to remove the controllerRef for the pod.
// expect 2 creates because the rs.Spec.Replicas=2 and there exists no
// matching pod.
validateSyncReplicaSet(t, fakePodControl, 2, 0, 1)
fakePodControl.Clear()
}
func TestUpdateSelectorControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
// put 2 pods in the podStore
newPodList(manager.podStore.Indexer, 2, api.PodRunning, labelMap, rs, "pod")
// update the RS so that its selector no longer matches the pods
updatedRS := *rs
updatedRS.Spec.Selector.MatchLabels = map[string]string{"foo": "baz"}
// put the updatedRS into the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updateRS() in this case).
manager.rsStore.Store.Add(&updatedRS)
manager.updateRS(rs, &updatedRS)
// verifies that the rs is added to the queue
rsKey := getKey(rs, t)
queueRS, _ := manager.queue.Get()
if queueRS != rsKey {
t.Fatalf("Expected to find key %v in queue, found %v", rsKey, queueRS)
}
manager.queue.Done(queueRS)
err := manager.syncReplicaSet(rsKey)
if err != nil {
t.Fatal(err)
}
// expect 2 patches to be sent to remove the controllerRef for the pods.
// expect 2 creates because the rc.Spec.Replicas=2 and there exists no
// matching pod.
validateSyncReplicaSet(t, fakePodControl, 2, 0, 2)
fakePodControl.Clear()
}
// RS controller shouldn't adopt or create more pods if the rc is about to be
// deleted.
func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
labelMap := map[string]string{"foo": "bar"}
rs := newReplicaSet(2, labelMap)
now := unversioned.Now()
rs.DeletionTimestamp = &now
manager.rsStore.Store.Add(rs)
pod1 := newPod("pod1", rs, api.PodRunning)
manager.podStore.Indexer.Add(pod1)
// no patch, no create
err := manager.syncReplicaSet(getKey(rs, t))
if err != nil {
t.Fatal(err)
}
validateSyncReplicaSet(t, fakePodControl, 0, 0, 0)
}

View File

@ -615,12 +615,6 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
trace.Step("ReplicationController restored")
rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey)
trace.Step("Expectations restored")
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
return err
}
trace.Step("Pods listed")
// TODO: Do the List and Filter in a single pass, or use an index.
var filteredPods []*api.Pod
@ -653,7 +647,7 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
for _, pod := range controlledDoesNotMatch {
err := cm.ReleasePod(pod)
if err != nil {
errlist = append(errlist, cm.ReleasePod(pod))
errlist = append(errlist, err)
}
}
if len(errlist) != 0 {

View File

@ -0,0 +1,464 @@
// +build integration,!no-etcd
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replicaset
import (
"fmt"
"net/http/httptest"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
internalclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/restclient"
controllerframwork "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/replicaset"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/integration/framework"
)
func testLabels() map[string]string {
return map[string]string{"name": "test"}
}
func newRS(name, namespace string, replicas int) *v1beta1.ReplicaSet {
replicasCopy := int32(replicas)
return &v1beta1.ReplicaSet{
TypeMeta: unversioned.TypeMeta{
Kind: "ReplicaSet",
APIVersion: "extensions/v1beta1",
},
ObjectMeta: v1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: v1beta1.ReplicaSetSpec{
Selector: &v1beta1.LabelSelector{
MatchLabels: testLabels(),
},
Replicas: &replicasCopy,
Template: v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: testLabels(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
},
},
}
}
func newMatchingPod(podName, namespace string) *v1.Pod {
return &v1.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: testLabels(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
}
// verifyRemainingObjects verifies if the number of the remaining replica
// sets and pods are rsNum and podNum. It returns error if the
// communication with the API server fails.
func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespace string, rsNum, podNum int) (bool, error) {
rsClient := clientSet.Extensions().ReplicaSets(namespace)
podClient := clientSet.Core().Pods(namespace)
pods, err := podClient.List(api.ListOptions{})
if err != nil {
return false, fmt.Errorf("Failed to list pods: %v", err)
}
var ret = true
if len(pods.Items) != podNum {
ret = false
t.Logf("expect %d pods, got %d pods", podNum, len(pods.Items))
}
rss, err := rsClient.List(api.ListOptions{})
if err != nil {
return false, fmt.Errorf("Failed to list replica sets: %v", err)
}
if len(rss.Items) != rsNum {
ret = false
t.Logf("expect %d RSs, got %d RSs", rsNum, len(rss.Items))
}
return ret, nil
}
func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, controllerframwork.SharedIndexInformer, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.EnableCoreControllers = false
_, s := framework.RunAMaster(masterConfig)
config := restclient.Config{Host: s.URL}
clientSet, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
resyncPeriodFunc := func() time.Duration {
return resyncPeriod
}
podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
rm := replicaset.NewReplicaSetController(
podInformer,
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replicaset-controller")),
resyncPeriodFunc,
replicaset.BurstReplicas,
4096,
enableGarbageCollector,
)
if err != nil {
t.Fatalf("Failed to create replicaset controller")
}
return s, rm, podInformer, clientSet
}
func TestAdoption(t *testing.T) {
var trueVar = true
testCases := []struct {
name string
existingOwnerReferences func(rs *v1beta1.ReplicaSet) []v1.OwnerReference
expectedOwnerReferences func(rs *v1beta1.ReplicaSet) []v1.OwnerReference
}{
{
"pod refers rs as an owner, not a controller",
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet"}}
},
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}}
},
},
{
"pod doesn't have owner references",
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{}
},
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}}
},
},
{
"pod refers rs as a controller",
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}}
},
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar}}
},
},
{
"pod refers other rs as the controller, refers the rs as an owner",
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{
{UID: "1", Name: "anotherRS", APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar},
{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet"},
}
},
func(rs *v1beta1.ReplicaSet) []v1.OwnerReference {
return []v1.OwnerReference{
{UID: "1", Name: "anotherRS", APIVersion: "extensions/v1beta1", Kind: "ReplicaSet", Controller: &trueVar},
{UID: rs.UID, Name: rs.Name, APIVersion: "extensions/v1beta1", Kind: "ReplicaSet"},
}
},
},
}
for i, tc := range testCases {
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace(fmt.Sprintf("rs-adoption-%d", i), s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rsClient := clientSet.Extensions().ReplicaSets(ns.Name)
podClient := clientSet.Core().Pods(ns.Name)
const rsName = "rs"
rs, err := rsClient.Create(newRS(rsName, ns.Name, 1))
if err != nil {
t.Fatalf("Failed to create replica set: %v", err)
}
podName := fmt.Sprintf("pod%d", i)
pod := newMatchingPod(podName, ns.Name)
pod.OwnerReferences = tc.existingOwnerReferences(rs)
_, err = podClient.Create(pod)
if err != nil {
t.Fatalf("Failed to create Pod: %v", err)
}
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
// wait for the podInformer to observe the pod, otherwise the rs controller
// will try to create a new pod rather than adopting the existing one.
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
for _, object := range objects {
pod, ok := object.(*api.Pod)
if !ok {
t.Fatal("expect object to be a pod")
}
if pod.Name == podName {
return true, nil
}
}
return false, nil
}); err != nil {
t.Fatal(err)
}
go rm.Run(5, stopCh)
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
updatedPod, err := podClient.Get(pod.Name)
if err != nil {
return false, err
}
if e, a := tc.expectedOwnerReferences(rs), updatedPod.OwnerReferences; reflect.DeepEqual(e, a) {
return true, nil
} else {
t.Logf("ownerReferences don't match, expect %v, got %v", e, a)
return false, nil
}
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}
}
func createRSsPods(t *testing.T, clientSet clientset.Interface, rss []*v1beta1.ReplicaSet, pods []*v1.Pod, ns string) {
rsClient := clientSet.Extensions().ReplicaSets(ns)
podClient := clientSet.Core().Pods(ns)
for _, rs := range rss {
if _, err := rsClient.Create(rs); err != nil {
t.Fatalf("Failed to create replica set %s: %v", rs.Name, err)
}
}
for _, pod := range pods {
if _, err := podClient.Create(pod); err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
}
}
func waitRSStable(t *testing.T, clientSet clientset.Interface, rs *v1beta1.ReplicaSet, ns string) {
rsClient := clientSet.Extensions().ReplicaSets(ns)
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
updatedRS, err := rsClient.Get(rs.Name)
if err != nil {
return false, err
}
if updatedRS.Status.Replicas != *rs.Spec.Replicas {
return false, nil
} else {
return true, nil
}
}); err != nil {
t.Fatal(err)
}
}
func TestUpdateSelectorToAdopt(t *testing.T) {
// We have pod1, pod2 and rs. rs.spec.replicas=1. At first rs.Selector
// matches pod1 only; change the selector to match pod2 as well. Verify
// there is only one pod left.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-selector-to-adopt", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 1)
// let rs's selector only match pod1
rs.Spec.Selector.MatchLabels["uniqueKey"] = "1"
rs.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRSStable(t, clientSet, rs, ns.Name)
// change the rs's selector to match both pods
patch := `{"spec":{"selector":{"matchLabels": {"uniqueKey":null}}}}`
rsClient := clientSet.Extensions().ReplicaSets(ns.Name)
rs, err := rsClient.Patch(rs.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replica set: %v", err)
}
t.Logf("patched rs = %#v", rs)
// wait for the rs select both pods and delete one of them
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}
func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rs. rs.spec.replicas=2. At first rs.Selector
// matches pod1 and pod2; change the selector to match only pod1. Verify
// that rs creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-selector-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRSStable(t, clientSet, rs, ns.Name)
// change the rs's selector to match both pods
patch := `{"spec":{"selector":{"matchLabels": {"uniqueKey":"1"}},"template":{"metadata":{"labels":{"uniqueKey":"1"}}}}}`
rsClient := clientSet.Extensions().ReplicaSets(ns.Name)
rs, err := rsClient.Patch(rs.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replica set: %v", err)
}
t.Logf("patched rs = %#v", rs)
// wait for the rs to create one more pod
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
}); err != nil {
t.Fatal(err)
}
podClient := clientSet.Core().Pods(ns.Name)
pod2, err = podClient.Get(pod2.Name)
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
}
close(stopCh)
}
func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rs. rs.spec.replicas=2. At first rs.Selector
// matches pod1 and pod2; change pod2's lables to non-matching. Verify
// that rs creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-label-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod2 := newMatchingPod("pod2", ns.Name)
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRSStable(t, clientSet, rs, ns.Name)
// change the rs's selector to match both pods
patch := `{"metadata":{"labels":{"name":null}}}`
podClient := clientSet.Core().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rs to create one more pod
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
}); err != nil {
t.Fatal(err)
}
pod2, err = podClient.Get(pod2.Name)
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
}
close(stopCh)
}
func TestUpdateLabelToBeAdopted(t *testing.T) {
// We have pod1, pod2 and rs. rs.spec.replicas=1. At first rs.Selector
// matches pod1 only; change pod2's lables to be matching. Verify the RS
// controller adopts pod2 and delete one of them, so there is only 1 pod
// left.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("rs-update-label-to-be-adopted", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rs := newRS("rs", ns.Name, 1)
// let rs's selector only matches pod1
rs.Spec.Selector.MatchLabels["uniqueKey"] = "1"
rs.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRSsPods(t, clientSet, []*v1beta1.ReplicaSet{rs}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRSStable(t, clientSet, rs, ns.Name)
// change the rs's selector to match both pods
patch := `{"metadata":{"labels":{"uniqueKey":"1"}}}`
podClient := clientSet.Core().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rs to select both pods and delete one of them
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}

View File

@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
package replicationcontroller
import (
"fmt"