let RC manager utilize the GC

This commit is contained in:
Chao Xu 2016-06-10 16:28:42 -07:00
parent bea5232790
commit 11a341de67
10 changed files with 1033 additions and 103 deletions

View File

@ -215,6 +215,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
ResyncPeriod(s),
replicationcontroller.BurstReplicas,
int(s.LookupCacheSizeForRC),
s.EnableGarbageCollector,
).Run(int(s.ConcurrentRCSyncs), wait.NeverStop)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -0,0 +1,144 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"strings"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/labels"
)
type PodControllerRefManager struct {
podControl PodControlInterface
controllerObject api.ObjectMeta
controllerSelector labels.Selector
controllerKind unversioned.GroupVersionKind
}
// NewPodControllerRefManager returns a PodControllerRefManager that exposes
// methods to manage the controllerRef of pods.
func NewPodControllerRefManager(
podControl PodControlInterface,
controllerObject api.ObjectMeta,
controllerSelector labels.Selector,
controllerKind unversioned.GroupVersionKind,
) *PodControllerRefManager {
return &PodControllerRefManager{podControl, controllerObject, controllerSelector, controllerKind}
}
// Classify first filters out inactive pods, then it classify the remaining pods
// into three categories: 1. matchesAndControlled are the pods whose labels
// match the selector of the RC, and have a controllerRef pointing to the
// controller 2. matchesNeedsController are the pods whose labels match the RC,
// but don't have a controllerRef. (Pods with matching labels but with a
// controllerRef pointing to other object are ignored) 3. controlledDoesNotMatch
// are the pods that have a controllerRef pointing to the controller, but their
// labels no longer match the selector.
func (m *PodControllerRefManager) Classify(pods []api.Pod) (
matchesAndControlled []*api.Pod,
matchesNeedsController []*api.Pod,
controlledDoesNotMatch []*api.Pod) {
for i := range pods {
pod := pods[i]
if !IsPodActive(pod) {
glog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
pod.Namespace, pod.Name, pod.Status.Phase, pod.DeletionTimestamp)
continue
}
controllerRef := getControllerOf(pod.ObjectMeta)
if controllerRef != nil {
if controllerRef.UID == m.controllerObject.UID {
// already controlled
if m.controllerSelector.Matches(labels.Set(pod.Labels)) {
matchesAndControlled = append(matchesAndControlled, &pod)
} else {
controlledDoesNotMatch = append(controlledDoesNotMatch, &pod)
}
} else {
// ignoring the pod controlled by other controller
glog.V(4).Infof("Ignoring pod %v/%v, it's owned by [%s/%s, name: %s, uid: %s]",
pod.Namespace, pod.Name, controllerRef.APIVersion, controllerRef.Kind, controllerRef.Name, controllerRef.UID)
continue
}
} else {
if !m.controllerSelector.Matches(labels.Set(pod.Labels)) {
continue
}
matchesNeedsController = append(matchesNeedsController, &pod)
}
}
return matchesAndControlled, matchesNeedsController, controlledDoesNotMatch
}
// getControllerOf returns the controllerRef if controllee has a controller,
// otherwise returns nil.
func getControllerOf(controllee api.ObjectMeta) *api.OwnerReference {
for _, owner := range controllee.OwnerReferences {
// controlled by other controller
if owner.Controller != nil && *owner.Controller == true {
return &owner
}
}
return nil
}
// AdoptPod sends a patch to take control of the pod. It returns the error if
// the patching fails.
func (m *PodControllerRefManager) AdoptPod(pod *api.Pod) error {
// we should not adopt any pods if the controller is about to be deleted
if m.controllerObject.DeletionTimestamp != nil {
return fmt.Errorf("cancel the adopt attempt for pod %s because the controlller is being deleted",
strings.Join([]string{pod.Namespace, pod.Name, string(pod.UID)}, "_"))
}
addControllerPatch := fmt.Sprintf(
`{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`,
m.controllerKind.GroupVersion(), m.controllerKind.Kind,
m.controllerObject.Name, m.controllerObject.UID, pod.UID)
return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch))
}
// ReleasePod sends a patch to free the pod from the control of the controller.
// It returns the error if the patching fails. 404 and 422 errors are ignored.
func (m *PodControllerRefManager) ReleasePod(pod *api.Pod) error {
glog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s",
pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controllerObject.Name)
deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, m.controllerObject.UID, pod.UID)
err := m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(deleteOwnerRefPatch))
if err != nil {
if errors.IsNotFound(err) {
// If the pod no longer exists, ignore it.
return nil
}
if errors.IsInvalid(err) {
// Invalid error will be returned in two cases: 1. the pod
// has no owner reference, 2. the uid of the pod doesn't
// match, which means the pod is deleted and then recreated.
// In both cases, the error can be ignored.
// TODO: If the pod has owner references, but none of them
// has the owner.UID, server will silently ignore the patch.
// Investigate why.
return nil
}
}
return err
}

View File

@ -349,8 +349,12 @@ type PodControlInterface interface {
CreatePods(namespace string, template *api.PodTemplateSpec, object runtime.Object) error
// CreatePodsOnNode creates a new pod accorting to the spec on the specified node.
CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *api.PodTemplateSpec, object runtime.Object, controllerRef *api.OwnerReference) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod.
PatchPod(namespace, name string, data []byte) error
}
// RealPodControl is the default implementation of PodControlInterface.
@ -404,14 +408,35 @@ func getPodsPrefix(controllerName string) string {
}
func (r RealPodControl) CreatePods(namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
return r.createPods("", namespace, template, object)
return r.createPods("", namespace, template, object, nil)
}
func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *api.PodTemplateSpec, controllerObject runtime.Object, controllerRef *api.OwnerReference) error {
if controllerRef == nil {
return fmt.Errorf("controllerRef is nil")
}
if len(controllerRef.APIVersion) == 0 {
return fmt.Errorf("controllerRef has empty APIVersion")
}
if len(controllerRef.Kind) == 0 {
return fmt.Errorf("controllerRef has empty Kind")
}
if controllerRef.Controller == nil || *controllerRef.Controller != true {
return fmt.Errorf("controllerRef.Controller is not set")
}
return r.createPods("", namespace, template, controllerObject, controllerRef)
}
func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
return r.createPods(nodeName, namespace, template, object)
return r.createPods(nodeName, namespace, template, object, nil)
}
func GetPodFromTemplate(template *api.PodTemplateSpec, parentObject runtime.Object) (*api.Pod, error) {
func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
_, err := r.KubeClient.Core().Pods(namespace).Patch(name, api.StrategicMergePatchType, data)
return err
}
func GetPodFromTemplate(template *api.PodTemplateSpec, parentObject runtime.Object, controllerRef *api.OwnerReference) (*api.Pod, error) {
desiredLabels := getPodsLabelSet(template)
desiredAnnotations, err := getPodsAnnotationSet(template, parentObject)
if err != nil {
@ -430,14 +455,17 @@ func GetPodFromTemplate(template *api.PodTemplateSpec, parentObject runtime.Obje
GenerateName: prefix,
},
}
if controllerRef != nil {
pod.OwnerReferences = append(pod.OwnerReferences, *controllerRef)
}
if err := api.Scheme.Convert(&template.Spec, &pod.Spec); err != nil {
return nil, fmt.Errorf("unable to convert pod template: %v", err)
}
return pod, nil
}
func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
pod, err := GetPodFromTemplate(template, object)
func (r RealPodControl) createPods(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object, controllerRef *api.OwnerReference) error {
pod, err := GetPodFromTemplate(template, object, controllerRef)
if err != nil {
return err
}
@ -479,40 +507,63 @@ func (r RealPodControl) DeletePod(namespace string, podID string, object runtime
type FakePodControl struct {
sync.Mutex
Templates []api.PodTemplateSpec
DeletePodName []string
Err error
Templates []api.PodTemplateSpec
ControllerRefs []api.OwnerReference
DeletePodName []string
Patches [][]byte
Err error
}
var _ PodControlInterface = &FakePodControl{}
func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error {
f.Lock()
defer f.Unlock()
f.Patches = append(f.Patches, data)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.Templates = append(f.Templates, *spec)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *api.PodTemplateSpec, object runtime.Object, controllerRef *api.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.Templates = append(f.Templates, *spec)
f.ControllerRefs = append(f.ControllerRefs, *controllerRef)
if f.Err != nil {
return f.Err
}
return nil
}
func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *api.PodTemplateSpec, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.Templates = append(f.Templates, *template)
if f.Err != nil {
return f.Err
}
f.Templates = append(f.Templates, *template)
return nil
}
func (f *FakePodControl) DeletePod(namespace string, podID string, object runtime.Object) error {
f.Lock()
defer f.Unlock()
f.DeletePodName = append(f.DeletePodName, podID)
if f.Err != nil {
return f.Err
}
f.DeletePodName = append(f.DeletePodName, podID)
return nil
}
@ -521,6 +572,8 @@ func (f *FakePodControl) Clear() {
defer f.Unlock()
f.DeletePodName = []string{}
f.Templates = []api.PodTemplateSpec{}
f.ControllerRefs = []api.OwnerReference{}
f.Patches = [][]byte{}
}
// ByLogging allows custom sorting of pods so the best one can be picked for getting its logs.

View File

@ -253,7 +253,9 @@ func TestCreatePods(t *testing.T) {
controllerSpec := newReplicationController(1)
// Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template
podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec)
if err := podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec); err != nil {
t.Fatalf("unexpected error: %v", err)
}
expectedPod := api.Pod{
ObjectMeta: api.ObjectMeta{
@ -265,7 +267,7 @@ func TestCreatePods(t *testing.T) {
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath("pods", api.NamespaceDefault, ""), "POST", nil)
actualPod, err := runtime.Decode(testapi.Default.Codec(), []byte(fakeHandler.RequestBody))
if err != nil {
t.Errorf("Unexpected error: %#v", err)
t.Fatalf("Unexpected error: %v", err)
}
if !api.Semantic.DeepDerivative(&expectedPod, actualPod) {
t.Logf("Body: %s", fakeHandler.RequestBody)

View File

@ -164,7 +164,7 @@ func TestControllerSyncJob(t *testing.T) {
"too few active pods, with controller error": {
2, 5, false,
fmt.Errorf("Fake error"), 0, 1, 1, 0,
0, 0, 1, 1, 0, false,
1, 0, 1, 1, 0, false,
},
"too many active pods": {
2, 5, false,
@ -174,7 +174,7 @@ func TestControllerSyncJob(t *testing.T) {
"too many active pods, with controller error": {
2, 5, false,
fmt.Errorf("Fake error"), 0, 3, 0, 0,
0, 0, 3, 0, 0, false,
0, 1, 3, 0, 0, false,
},
"failed pod": {
2, 5, false,

View File

@ -28,7 +28,7 @@ import (
// newPCB generates a new PCB using the id string as a unique qualifier
func newPCB(id string, ps *apps.PetSet) (*pcb, error) {
petPod, err := controller.GetPodFromTemplate(&ps.Spec.Template, ps)
petPod, err := controller.GetPodFromTemplate(&ps.Spec.Template, ps, nil)
if err != nil {
return nil, err
}

View File

@ -352,9 +352,10 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
fakePodControl.Err = fmt.Errorf("Fake Error")
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 0, 0)
validateSyncReplicaSet(t, &fakePodControl, 1, 0)
// This replica should not need a Lowering of expectations, since the previous create failed
fakePodControl.Clear()
fakePodControl.Err = nil
manager.syncReplicaSet(getKey(rsSpec, t))
validateSyncReplicaSet(t, &fakePodControl, 1, 0)

View File

@ -26,6 +26,9 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
@ -36,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -65,6 +69,10 @@ const (
statusUpdateRetries = 1
)
func getRCKind() unversioned.GroupVersionKind {
return v1.SchemeGroupVersion.WithKind("ReplicationController")
}
// ReplicationManager is responsible for synchronizing ReplicationController objects stored
// in the system with actual running pods.
// TODO: this really should be called ReplicationController. The only reason why it's a Manager
@ -105,20 +113,24 @@ type ReplicationManager struct {
// Controllers that need to be synced
queue *workqueue.Type
// garbageCollectorEnabled denotes if the garbage collector is enabled. RC
// manager behaves differently if GC is enabled.
garbageCollectorEnabled bool
}
// NewReplicationManager creates a replication manager
func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return newReplicationManagerInternal(
return newReplicationManager(
eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}),
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
}
// newReplicationManagerInternal configures a replication manager with the specified event recorder
func newReplicationManagerInternal(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
// newReplicationManager configures a replication manager with the specified event recorder
func newReplicationManager(eventRecorder record.EventRecorder, podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int, garbageCollectorEnabled bool) *ReplicationManager {
if kubeClient != nil && kubeClient.Core().GetRESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("replication_controller", kubeClient.Core().GetRESTClient().GetRateLimiter())
}
@ -132,6 +144,7 @@ func newReplicationManagerInternal(eventRecorder record.EventRecorder, podInform
burstReplicas: burstReplicas,
expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
queue: workqueue.New(),
garbageCollectorEnabled: garbageCollectorEnabled,
}
rm.rcStore.Indexer, rm.rcController = framework.NewIndexerInformer(
@ -147,43 +160,8 @@ func newReplicationManagerInternal(eventRecorder record.EventRecorder, podInform
// TODO: Can we have much longer period here?
FullControllerResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
UpdateFunc: func(old, cur interface{}) {
oldRC := old.(*api.ReplicationController)
curRC := cur.(*api.ReplicationController)
// We should invalidate the whole lookup cache if a RC's selector has been updated.
//
// Imagine that you have two RCs:
// * old RC1
// * new RC2
// You also have a pod that is attached to RC2 (because it doesn't match RC1 selector).
// Now imagine that you are changing RC1 selector so that it is now matching that pod,
// in such case, we must invalidate the whole cache so that pod could be adopted by RC1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if !reflect.DeepEqual(oldRC.Spec.Selector, curRC.Spec.Selector) {
rm.lookupCache.InvalidateAll()
}
// You might imagine that we only really need to enqueue the
// controller when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
// resync can requeue any controllers that don't yet have pods
// but whose last attempts at creating a pod have failed (since
// we don't block on creation of pods) instead of those
// controllers stalling indefinitely. Enqueueing every time
// does result in some spurious syncs (like when Status.Replica
// is updated and the watch notification from it retriggers
// this function), but in general extra resyncs shouldn't be
// that bad as rcs that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if oldRC.Status.Replicas != curRC.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas)
}
rm.enqueueController(cur)
},
AddFunc: rm.enqueueController,
UpdateFunc: rm.updateRC,
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
// way of achieving this is by performing a `stop` operation on the controller.
@ -212,7 +190,8 @@ func newReplicationManagerInternal(eventRecorder record.EventRecorder, podInform
// NewReplicationManagerFromClientForIntegration creates a new ReplicationManager that runs its own informer. It disables event recording for use in integration tests.
func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
rm := newReplicationManagerInternal(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
garbageCollectorEnabled := false
rm := newReplicationManager(&record.FakeRecorder{}, podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rm.internalPodInformer = podInformer
return rm
}
@ -220,7 +199,8 @@ func NewReplicationManagerFromClientForIntegration(kubeClient clientset.Interfac
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
garbageCollectorEnabled := false
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize, garbageCollectorEnabled)
rm.internalPodInformer = podInformer
return rm
@ -255,13 +235,14 @@ func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) {
// getPodController returns the controller managing the given pod.
// TODO: Surface that we are ignoring multiple controllers for a single pod.
// TODO: use ownerReference.Controller to determine if the rc controls the pod.
func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationController {
// look up in the cache, if cached and the cache is valid, just return cached value
if obj, cached := rm.lookupCache.GetMatchingObject(pod); cached {
controller, ok := obj.(*api.ReplicationController)
if !ok {
// This should not happen
glog.Errorf("lookup cache does not retuen a ReplicationController object")
glog.Errorf("lookup cache does not return a ReplicationController object")
return nil
}
if cached && rm.isCacheValid(pod, controller) {
@ -320,6 +301,44 @@ func isControllerMatch(pod *api.Pod, rc *api.ReplicationController) bool {
return true
}
// callback when RC is updated
func (rm *ReplicationManager) updateRC(old, cur interface{}) {
oldRC := old.(*api.ReplicationController)
curRC := cur.(*api.ReplicationController)
// We should invalidate the whole lookup cache if a RC's selector has been updated.
//
// Imagine that you have two RCs:
// * old RC1
// * new RC2
// You also have a pod that is attached to RC2 (because it doesn't match RC1 selector).
// Now imagine that you are changing RC1 selector so that it is now matching that pod,
// in such case, we must invalidate the whole cache so that pod could be adopted by RC1
//
// This makes the lookup cache less helpful, but selector update does not happen often,
// so it's not a big problem
if !reflect.DeepEqual(oldRC.Spec.Selector, curRC.Spec.Selector) {
rm.lookupCache.InvalidateAll()
}
// You might imagine that we only really need to enqueue the
// controller when Spec changes, but it is safer to sync any
// time this function is triggered. That way a full informer
// resync can requeue any controllers that don't yet have pods
// but whose last attempts at creating a pod have failed (since
// we don't block on creation of pods) instead of those
// controllers stalling indefinitely. Enqueueing every time
// does result in some spurious syncs (like when Status.Replica
// is updated and the watch notification from it retriggers
// this function), but in general extra resyncs shouldn't be
// that bad as rcs that haven't met expectations yet won't
// sync, and all the listing is done using local stores.
if oldRC.Status.Replicas != curRC.Status.Replicas {
glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas)
}
rm.enqueueController(cur)
}
// When a pod is created, enqueue the controller that manages it and update it's expectations.
func (rm *ReplicationManager) addPod(obj interface{}) {
pod := obj.(*api.Pod)
@ -370,10 +389,8 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
return
}
if rc := rm.getPodController(curPod); rc != nil {
rm.enqueueController(rc)
}
// Only need to get the old controller if the labels changed.
// Enqueue the oldRC before the curRC to give curRC a chance to adopt the oldPod.
if labelChanged {
// If the old and new rc are the same, the first one that syncs
// will set expectations preventing any damage from the second.
@ -381,6 +398,10 @@ func (rm *ReplicationManager) updatePod(old, cur interface{}) {
rm.enqueueController(oldRC)
}
}
if curRC := rm.getPodController(curPod); curRC != nil {
rm.enqueueController(curRC)
}
}
// When a pod is deleted, enqueue the controller that manages the pod and update its expectations.
@ -481,7 +502,21 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, rc *api.Re
for i := 0; i < diff; i++ {
go func() {
defer wg.Done()
if err := rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc); err != nil {
var err error
if rm.garbageCollectorEnabled {
var trueVar = true
controllerRef := &api.OwnerReference{
APIVersion: getRCKind().GroupVersion().String(),
Kind: getRCKind().Kind,
Name: rc.Name,
UID: rc.UID,
Controller: &trueVar,
}
err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef)
} else {
err = rm.podControl.CreatePods(rc.Namespace, rc.Spec.Template, rc)
}
if err != nil {
// Decrement the expected number of creates because the informer won't observe this pod
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", rc.Namespace, rc.Name)
rm.expectations.CreationObserved(rcKey)
@ -580,7 +615,6 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
trace.Step("ReplicationController restored")
rcNeedsSync := rm.expectations.SatisfiedExpectations(rcKey)
trace.Step("Expectations restored")
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
@ -588,9 +622,59 @@ func (rm *ReplicationManager) syncReplicationController(key string) error {
}
trace.Step("Pods listed")
// TODO: Do this in a single pass, or use an index.
filteredPods := controller.FilterActivePods(podList.Items)
if rcNeedsSync {
// TODO: Do the List and Filter in a single pass, or use an index.
var filteredPods []*api.Pod
if rm.garbageCollectorEnabled {
// list all pods to include the pods that don't match the rc's selector
// anymore but has the stale controller ref.
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Everything())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
return err
}
cm := controller.NewPodControllerRefManager(rm.podControl, rc.ObjectMeta, labels.Set(rc.Spec.Selector).AsSelector(), getRCKind())
matchesAndControlled, matchesNeedsController, controlledDoesNotMatch := cm.Classify(podList.Items)
for _, pod := range matchesNeedsController {
err := cm.AdoptPod(pod)
// continue to next pod if adoption fails.
if err != nil {
// If the pod no longer exists, don't even log the error.
if !errors.IsNotFound(err) {
utilruntime.HandleError(err)
}
} else {
matchesAndControlled = append(matchesAndControlled, pod)
}
}
filteredPods = matchesAndControlled
// remove the controllerRef for the pods that no longer have matching labels
var errlist []error
for _, pod := range controlledDoesNotMatch {
err := cm.ReleasePod(pod)
if err != nil {
errlist = append(errlist, cm.ReleasePod(pod))
}
}
if len(errlist) != 0 {
aggregate := utilerrors.NewAggregate(errlist)
// push the RC into work queue again. We need to try to free the
// pods again otherwise they will stuck with the stale
// controllerRef.
rm.queue.Add(key)
return aggregate
}
} else {
podList, err := rm.podStore.Pods(rc.Namespace).List(labels.Set(rc.Spec.Selector).AsSelector())
if err != nil {
glog.Errorf("Error getting pods for rc %q: %v", key, err)
rm.queue.Add(key)
return err
}
filteredPods = controller.FilterActivePods(podList.Items)
}
if rcNeedsSync && rc.DeletionTimestamp == nil {
rm.manageReplicas(filteredPods, &rc)
}
trace.Step("manageReplicas done")

View File

@ -95,34 +95,45 @@ func newReplicationController(replicas int) *api.ReplicationController {
return rc
}
// create a pod with the given phase for the given rc (same selectors and namespace).
func newPod(name string, rc *api.ReplicationController, status api.PodPhase) *api.Pod {
return &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
Labels: rc.Spec.Selector,
Namespace: rc.Namespace,
},
Status: api.PodStatus{Phase: status},
}
}
// create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store.
func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController, name string) *api.PodList {
pods := []api.Pod{}
var trueVar = true
controllerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
for i := 0; i < count; i++ {
newPod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%s%d", name, i),
Labels: rc.Spec.Selector,
Namespace: rc.Namespace,
},
Status: api.PodStatus{Phase: status},
}
pod := newPod(fmt.Sprintf("%s%d", name, i), rc, status)
pod.OwnerReferences = []api.OwnerReference{controllerReference}
if store != nil {
store.Add(&newPod)
store.Add(pod)
}
pods = append(pods, newPod)
pods = append(pods, *pod)
}
return &api.PodList{
Items: pods,
}
}
func validateSyncReplication(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes int) {
if len(fakePodControl.Templates) != expectedCreates {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates))
func validateSyncReplication(t *testing.T, fakePodControl *controller.FakePodControl, expectedCreates, expectedDeletes, expectedPatches int) {
if e, a := expectedCreates, len(fakePodControl.Templates); e != a {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", e, a)
}
if len(fakePodControl.DeletePodName) != expectedDeletes {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName))
if e, a := expectedDeletes, len(fakePodControl.DeletePodName); e != a {
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", e, a)
}
if e, a := expectedPatches, len(fakePodControl.Patches); e != a {
t.Errorf("Unexpected number of patches. Expected %d, saw %d\n", e, a)
}
}
@ -148,7 +159,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
}
func TestSyncReplicationControllerDeletes(t *testing.T) {
@ -164,7 +175,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
newPodList(manager.podStore.Indexer, 2, api.PodRunning, controllerSpec, "pod")
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 1)
validateSyncReplication(t, &fakePodControl, 0, 1, 0)
}
func TestDeleteFinalStateUnknown(t *testing.T) {
@ -212,7 +223,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) {
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 2, 0)
validateSyncReplication(t, &fakePodControl, 2, 0, 0)
}
func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
@ -238,7 +249,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
manager.podControl = &fakePodControl
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
if fakeHandler.RequestReceived != nil {
t.Errorf("Unexpected update when pods and rcs are in a steady state")
}
@ -297,7 +308,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
decRc := runtime.EncodeOrDie(testapi.Default.Codec(), rc)
fakeHandler.ValidateRequest(t, testapi.Default.ResourcePath(replicationControllerResourceName(), rc.Namespace, rc.Name)+"/status", "PUT", &decRc)
validateSyncReplication(t, &fakePodControl, 1, 0)
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
}
func TestSyncReplicationControllerDormancy(t *testing.T) {
@ -321,13 +332,13 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
// Creates a replica and sets expectations
controllerSpec.Status.Replicas = 1
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 1, 0)
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
// Expectations prevents replicas but not an update on status
controllerSpec.Status.Replicas = 0
fakePodControl.Clear()
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
// Get the key for the controller
rcKey, err := controller.KeyFunc(controllerSpec)
@ -336,19 +347,20 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
}
// Lowering expectations should lead to a sync that creates a replica, however the
// fakePodControl error will prevent this, leaving expectations at 0, 0
// fakePodControl error will prevent this, leaving expectations at 0, 0.
manager.expectations.CreationObserved(rcKey)
controllerSpec.Status.Replicas = 1
fakePodControl.Clear()
fakePodControl.Err = fmt.Errorf("Fake Error")
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
// This replica should not need a Lowering of expectations, since the previous create failed
fakePodControl.Clear()
fakePodControl.Err = nil
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 1, 0)
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
// 1 PUT for the rc status during dormancy window.
// Note that the pod creates go through pod control so they're not recorded.
@ -697,7 +709,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
expectedPods = burstReplicas
}
// This validates the rc manager sync actually created pods
validateSyncReplication(t, &fakePodControl, expectedPods, 0)
validateSyncReplication(t, &fakePodControl, expectedPods, 0, 0)
// This simulates the watch events for all but 1 of the expected pods.
// None of these should wake the controller because it has expectations==BurstReplicas.
@ -718,7 +730,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
if expectedPods > burstReplicas {
expectedPods = burstReplicas
}
validateSyncReplication(t, &fakePodControl, 0, expectedPods)
validateSyncReplication(t, &fakePodControl, 0, expectedPods, 0)
// To accurately simulate a watch we must delete the exact pods
// the rc is waiting for.
@ -753,7 +765,7 @@ func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int)
// Check that the rc didn't take any action for all the above pods
fakePodControl.Clear()
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
// Create/Delete the last pod
// The last add pod will decrease the expectation of the rc to 0,
@ -831,7 +843,7 @@ func TestRCSyncExpectations(t *testing.T) {
},
})
manager.syncReplicationController(getKey(controllerSpec, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
}
func TestDeleteControllerAndExpectations(t *testing.T) {
@ -847,7 +859,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
// This should set expectations for the rc
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 1, 0)
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
fakePodControl.Clear()
// Get the RC key
@ -873,7 +885,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
podExp.Add(-1, 0)
manager.podStore.Indexer.Replace(make([]interface{}, 0), "0")
manager.syncReplicationController(getKey(rc, t))
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
}
func TestRCManagerNotReady(t *testing.T) {
@ -891,7 +903,7 @@ func TestRCManagerNotReady(t *testing.T) {
rcKey := getKey(controllerSpec, t)
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 0, 0)
validateSyncReplication(t, &fakePodControl, 0, 0, 0)
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
@ -899,7 +911,7 @@ func TestRCManagerNotReady(t *testing.T) {
manager.podStoreSynced = alwaysReady
manager.syncReplicationController(rcKey)
validateSyncReplication(t, &fakePodControl, 1, 0)
validateSyncReplication(t, &fakePodControl, 1, 0, 0)
}
// shuffle returns a new shuffled list of container controllers.
@ -1116,3 +1128,193 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) {
}
}
}
// setupManagerWithGCEnabled creates a RC manager with a fakePodControl and with garbageCollectorEnabled set to true
func setupManagerWithGCEnabled() (manager *ReplicationManager, fakePodControl *controller.FakePodControl) {
c := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
fakePodControl = &controller.FakePodControl{}
manager = NewReplicationManagerFromClient(c, controller.NoResyncPeriodFunc, BurstReplicas, 0)
manager.garbageCollectorEnabled = true
manager.podStoreSynced = alwaysReady
manager.podControl = fakePodControl
return manager, fakePodControl
}
func TestDoNotPatchPodWithOtherControlRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
var trueVar = true
otherControllerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "v1", Kind: "ReplicationController", Name: "AnotherRC", Controller: &trueVar}
// add to podStore a matching Pod controlled by another controller. Expect no patch.
pod := newPod("pod", rc, api.PodRunning)
pod.OwnerReferences = []api.OwnerReference{otherControllerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
}
// because the matching pod already has a controller, so 2 pods should be created.
validateSyncReplication(t, fakePodControl, 2, 0, 0)
}
func TestPatchPodWithOtherOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
// add to podStore one more matching pod that doesn't have a controller
// ref, but has an owner ref pointing to other object. Expect a patch to
// take control of it.
unrelatedOwnerReference := api.OwnerReference{UID: util.NewUUID(), APIVersion: "batch/v1", Kind: "Job", Name: "Job"}
pod := newPod("pod", rc, api.PodRunning)
pod.OwnerReferences = []api.OwnerReference{unrelatedOwnerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
}
// 1 patch to take control of pod, and 1 create of new pod.
validateSyncReplication(t, fakePodControl, 1, 0, 1)
}
func TestPatchPodWithCorrectOwnerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
// add to podStore a matching pod that has an ownerRef pointing to the rc,
// but ownerRef.Controller is false. Expect a patch to take control it.
rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name}
pod := newPod("pod", rc, api.PodRunning)
pod.OwnerReferences = []api.OwnerReference{rcOwnerReference}
manager.podStore.Indexer.Add(pod)
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
}
// 1 patch to take control of pod, and 1 create of new pod.
validateSyncReplication(t, fakePodControl, 1, 0, 1)
}
func TestPatchPodFails(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
// add to podStore two matching pods. Expect two patches to take control
// them.
manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning))
// let both patches fail. The rc manager will assume it fails to take
// control of the pods and create new ones.
fakePodControl.Err = fmt.Errorf("Fake Error")
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
}
// 2 patches to take control of pod1 and pod2 (both fail), 2 creates.
validateSyncReplication(t, fakePodControl, 2, 0, 2)
}
func TestPatchExtraPodsThenDelete(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
// add to podStore three matching pods. Expect three patches to take control
// them, and later delete one of them.
manager.podStore.Indexer.Add(newPod("pod1", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod2", rc, api.PodRunning))
manager.podStore.Indexer.Add(newPod("pod3", rc, api.PodRunning))
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
}
// 3 patches to take control of the pods, and 1 deletion because there is an extra pod.
validateSyncReplication(t, fakePodControl, 0, 1, 3)
}
func TestUpdateLabelsRemoveControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
manager.rcStore.Indexer.Add(rc)
// put one pod in the podStore
pod := newPod("pod", rc, api.PodRunning)
var trueVar = true
rcOwnerReference := api.OwnerReference{UID: rc.UID, APIVersion: "v1", Kind: "ReplicationController", Name: rc.Name, Controller: &trueVar}
pod.OwnerReferences = []api.OwnerReference{rcOwnerReference}
updatedPod := *pod
// reset the labels
updatedPod.Labels = make(map[string]string)
// add the updatedPod to the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updatePod() in this case).
manager.podStore.Indexer.Add(&updatedPod)
// send a update of the same pod with modified labels
manager.updatePod(pod, &updatedPod)
// verifies that rc is added to the queue
rcKey := getKey(rc, t)
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
}
manager.queue.Done(queueRC)
err := manager.syncReplicationController(rcKey)
if err != nil {
t.Fatal(err)
}
// expect 1 patch to be sent to remove the controllerRef for the pod.
// expect 2 creates because the rc.Spec.Replicas=2 and there exists no
// matching pod.
validateSyncReplication(t, fakePodControl, 2, 0, 1)
fakePodControl.Clear()
}
func TestUpdateSelectorControllerRef(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
// put 2 pods in the podStore
newPodList(manager.podStore.Indexer, 2, api.PodRunning, rc, "pod")
// update the RC so that its selector no longer matches the pods
updatedRC := *rc
updatedRC.Spec.Selector = map[string]string{"foo": "baz"}
// put the updatedRC into the store. This is consistent with the behavior of
// the Informer: Informer updates the store before call the handler
// (updateRC() in this case).
manager.rcStore.Indexer.Add(&updatedRC)
manager.updateRC(rc, &updatedRC)
// verifies that the rc is added to the queue
rcKey := getKey(rc, t)
queueRC, _ := manager.queue.Get()
if queueRC != rcKey {
t.Fatalf("Expected to find key %v in queue, found %v", rcKey, queueRC)
}
manager.queue.Done(queueRC)
err := manager.syncReplicationController(rcKey)
if err != nil {
t.Fatal(err)
}
// expect 2 patches to be sent to remove the controllerRef for the pods.
// expect 2 creates because the rc.Spec.Replicas=2 and there exists no
// matching pod.
validateSyncReplication(t, fakePodControl, 2, 0, 2)
fakePodControl.Clear()
}
// RC manager shouldn't adopt or create more pods if the rc is about to be
// deleted.
func TestDoNotAdoptOrCreateIfBeingDeleted(t *testing.T) {
manager, fakePodControl := setupManagerWithGCEnabled()
rc := newReplicationController(2)
now := unversioned.Now()
rc.DeletionTimestamp = &now
manager.rcStore.Indexer.Add(rc)
pod1 := newPod("pod1", rc, api.PodRunning)
manager.podStore.Indexer.Add(pod1)
// no patch, no create
err := manager.syncReplicationController(getKey(rc, t))
if err != nil {
t.Fatal(err)
}
validateSyncReplication(t, fakePodControl, 0, 0, 0)
}

View File

@ -0,0 +1,443 @@
// +build integration,!no-etcd
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package integration
import (
"fmt"
"net/http/httptest"
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
internalclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3"
"k8s.io/kubernetes/pkg/client/restclient"
controllerframwork "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/integration/framework"
)
func testLabels() map[string]string {
return map[string]string{"name": "test"}
}
func newRC(name, namespace string, replicas int) *v1.ReplicationController {
replicasCopy := int32(replicas)
return &v1.ReplicationController{
TypeMeta: unversioned.TypeMeta{
Kind: "ReplicationController",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Namespace: namespace,
Name: name,
},
Spec: v1.ReplicationControllerSpec{
Selector: testLabels(),
Replicas: &replicasCopy,
Template: &v1.PodTemplateSpec{
ObjectMeta: v1.ObjectMeta{
Labels: testLabels(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
},
},
}
}
func newMatchingPod(podName, namespace string) *v1.Pod {
return &v1.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: v1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: testLabels(),
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
}
}
// verifyRemainingObjects verifies if the number of the remaining replication
// controllers and pods are rcNum and podNum. It returns error if the
// communication with the API server fails.
func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespace string, rcNum, podNum int) (bool, error) {
rcClient := clientSet.Core().ReplicationControllers(namespace)
podClient := clientSet.Core().Pods(namespace)
pods, err := podClient.List(api.ListOptions{})
if err != nil {
return false, fmt.Errorf("Failed to list pods: %v", err)
}
var ret = true
if len(pods.Items) != podNum {
ret = false
t.Logf("expect %d pods, got %d pods", podNum, len(pods.Items))
}
rcs, err := rcClient.List(api.ListOptions{})
if err != nil {
return false, fmt.Errorf("Failed to list replication controllers: %v", err)
}
if len(rcs.Items) != rcNum {
ret = false
t.Logf("expect %d RCs, got %d RCs", rcNum, len(rcs.Items))
}
return ret, nil
}
func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, controllerframwork.SharedIndexInformer, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.EnableCoreControllers = false
_, s := framework.RunAMaster(masterConfig)
config := restclient.Config{Host: s.URL}
clientSet, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Error in create clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
resyncPeriodFunc := func() time.Duration {
return resyncPeriod
}
podInformer := informers.CreateSharedPodIndexInformer(internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "pod-informer")), resyncPeriod)
rm := replication.NewReplicationManager(
podInformer,
internalclientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "replication-controller")),
resyncPeriodFunc,
replication.BurstReplicas,
4096,
enableGarbageCollector,
)
if err != nil {
t.Fatalf("Failed to create replication manager")
}
return s, rm, podInformer, clientSet
}
func TestAdoption(t *testing.T) {
var trueVar = true
testCases := []struct {
name string
existingOwnerReferences func(rc *v1.ReplicationController) []v1.OwnerReference
expectedOwnerReferences func(rc *v1.ReplicationController) []v1.OwnerReference
}{
{
"pod refers rc as an owner, not a controller",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController"}}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
},
{
"pod doesn't have owner references",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
},
{
"pod refers rc as a controller",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar}}
},
},
{
"pod refers other rc as the controller, refers the rc as an owner",
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{
{UID: "1", Name: "anotherRC", APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar},
{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController"},
}
},
func(rc *v1.ReplicationController) []v1.OwnerReference {
return []v1.OwnerReference{
{UID: "1", Name: "anotherRC", APIVersion: "v1", Kind: "ReplicationController", Controller: &trueVar},
{UID: rc.UID, Name: rc.Name, APIVersion: "v1", Kind: "ReplicationController"},
}
},
},
}
for i, tc := range testCases {
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace(fmt.Sprintf("adoption-%d", i), s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
podClient := clientSet.Core().Pods(ns.Name)
const rcName = "rc"
rc, err := rcClient.Create(newRC(rcName, ns.Name, 1))
if err != nil {
t.Fatalf("Failed to create replication controller: %v", err)
}
pod := newMatchingPod("pod1", ns.Name)
pod.OwnerReferences = tc.existingOwnerReferences(rc)
_, err = podClient.Create(pod)
if err != nil {
t.Fatalf("Failed to create Pod: %v", err)
}
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
updatedPod, err := podClient.Get(pod.Name)
if err != nil {
return false, err
}
if e, a := tc.expectedOwnerReferences(rc), updatedPod.OwnerReferences; reflect.DeepEqual(e, a) {
return true, nil
} else {
t.Logf("ownerReferences don't match, expect %v, got %v", e, a)
return false, nil
}
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}
}
func createRCsPods(t *testing.T, clientSet clientset.Interface, rcs []*v1.ReplicationController, pods []*v1.Pod, ns string) {
rcClient := clientSet.Core().ReplicationControllers(ns)
podClient := clientSet.Core().Pods(ns)
for _, rc := range rcs {
if _, err := rcClient.Create(rc); err != nil {
t.Fatalf("Failed to create replication controller %s: %v", rc.Name, err)
}
}
for _, pod := range pods {
if _, err := podClient.Create(pod); err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}
}
}
func waitRCStable(t *testing.T, clientSet clientset.Interface, rc *v1.ReplicationController, ns string) {
rcClient := clientSet.Core().ReplicationControllers(ns)
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
updatedRC, err := rcClient.Get(rc.Name)
if err != nil {
return false, err
}
if updatedRC.Status.Replicas != *rc.Spec.Replicas {
return false, nil
} else {
return true, nil
}
}); err != nil {
t.Fatal(err)
}
}
func TestUpdateSelectorToAdopt(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector
// matches pod1 only; change the selector to match pod2 as well. Verify
// there is only one pod left.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-selector-to-adopt", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 1)
// let rc's selector only match pod1
rc.Spec.Selector["uniqueKey"] = "1"
rc.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"spec":{"selector":{"uniqueKey":null}}}`
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
rc, err := rcClient.Patch(rc.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replication controller: %v", err)
}
t.Logf("patched rc = %#v", rc)
// wait for the rc select both pods and delete one of them
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}
func TestUpdateSelectorToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=2. At first rc.Selector
// matches pod1 and pod2; change the selector to match only pod1. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-selector-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"spec":{"selector":{"uniqueKey":"1"},"template":{"metadata":{"labels":{"uniqueKey":"1"}}}}}`
rcClient := clientSet.Core().ReplicationControllers(ns.Name)
rc, err := rcClient.Patch(rc.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch replication controller: %v", err)
}
t.Logf("patched rc = %#v", rc)
// wait for the rc to create one more pod
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
}); err != nil {
t.Fatal(err)
}
podClient := clientSet.Core().Pods(ns.Name)
pod2, err = podClient.Get(pod2.Name)
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
}
close(stopCh)
}
func TestUpdateLabelToRemoveControllerRef(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=2. At first rc.Selector
// matches pod1 and pod2; change pod2's lables to non-matching. Verify
// that rc creates one more pod, so there are 3 pods. Also verify that
// pod2's controllerRef is cleared.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-label-to-remove-controllerref", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 2)
pod1 := newMatchingPod("pod1", ns.Name)
pod2 := newMatchingPod("pod2", ns.Name)
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"metadata":{"labels":{"name":null}}}`
podClient := clientSet.Core().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rc to create one more pod
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 3)
}); err != nil {
t.Fatal(err)
}
pod2, err = podClient.Get(pod2.Name)
if err != nil {
t.Fatalf("Failed to get pod2: %v", err)
}
if len(pod2.OwnerReferences) != 0 {
t.Fatalf("ownerReferences of pod2 is not cleared, got %#v", pod2.OwnerReferences)
}
close(stopCh)
}
func TestUpdateLabelToBeAdopted(t *testing.T) {
// We have pod1, pod2 and rc. rc.spec.replicas=1. At first rc.Selector
// matches pod1 only; change pod2's lables to be matching. Verify the RC
// controller adopts pod2 and delete one of them, so there is only 1 pod
// left.
s, rm, podInformer, clientSet := rmSetup(t, true)
ns := framework.CreateTestingNamespace("update-label-to-be-adopted", s, t)
defer framework.DeleteTestingNamespace(ns, s, t)
rc := newRC("rc", ns.Name, 1)
// let rc's selector only matches pod1
rc.Spec.Selector["uniqueKey"] = "1"
rc.Spec.Template.Labels["uniqueKey"] = "1"
pod1 := newMatchingPod("pod1", ns.Name)
pod1.Labels["uniqueKey"] = "1"
pod2 := newMatchingPod("pod2", ns.Name)
pod2.Labels["uniqueKey"] = "2"
createRCsPods(t, clientSet, []*v1.ReplicationController{rc}, []*v1.Pod{pod1, pod2}, ns.Name)
stopCh := make(chan struct{})
go podInformer.Run(stopCh)
go rm.Run(5, stopCh)
waitRCStable(t, clientSet, rc, ns.Name)
// change the rc's selector to match both pods
patch := `{"metadata":{"labels":{"uniqueKey":"1"}}}`
podClient := clientSet.Core().Pods(ns.Name)
pod2, err := podClient.Patch(pod2.Name, api.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch pod2: %v", err)
}
t.Logf("patched pod2 = %#v", pod2)
// wait for the rc to select both pods and delete one of them
if err := wait.Poll(10*time.Second, 60*time.Second, func() (bool, error) {
return verifyRemainingObjects(t, clientSet, ns.Name, 1, 1)
}); err != nil {
t.Fatal(err)
}
close(stopCh)
}