diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 10b0ca25605..f79468b811e 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -216,7 +216,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st controllerManager := replicationControllerPkg.NewReplicationManager(cl) // TODO: Write an integration test for the replication controllers watch. - controllerManager.Run(1 * time.Second) + go controllerManager.Run(3, util.NeverStop) nodeResources := &api.NodeResources{ Capacity: api.ResourceList{ diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index b3f052cdd68..9325d4e0fb9 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -55,6 +55,7 @@ type CMServer struct { CloudProvider string CloudConfigFile string ConcurrentEndpointSyncs int + ConcurrentRCSyncs int MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -90,6 +91,7 @@ func NewCMServer() *CMServer { Port: ports.ControllerManagerPort, Address: util.IP(net.ParseIP("127.0.0.1")), ConcurrentEndpointSyncs: 5, + ConcurrentRCSyncs: 5, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, NamespaceSyncPeriod: 5 * time.Minute, @@ -111,6 +113,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") fs.StringVar(&s.MinionRegexp, "minion-regexp", s.MinionRegexp, "If non empty, and --cloud-provider is specified, a regular expression for matching minion VMs.") fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ @@ -207,7 +210,7 @@ func (s *CMServer) Run(_ []string) error { go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) - controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod) + go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) nodeResources := &api.NodeResources{ diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 63b4bdddc7f..fceeb25d71c 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -144,7 +144,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, go endpoints.Run(5, util.NeverStop) controllerManager := controller.NewReplicationManager(cl) - controllerManager.Run(controller.DefaultSyncPeriod) + go controllerManager.Run(5, util.NeverStop) } func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP, port int) { diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 0afd64b8661..31d495dbc56 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -123,6 +123,59 @@ func (s *StoreToNodeLister) GetNodeInfo(id string) (*api.Node, error) { return minion.(*api.Node), nil } +// StoreToControllerLister gives a store List and Exists methods. The store must contain only ReplicationControllers. +type StoreToControllerLister struct { + Store +} + +// Exists checks if the given rc exists in the store. +func (s *StoreToControllerLister) Exists(controller *api.ReplicationController) (bool, error) { + _, exists, err := s.Store.Get(controller) + if err != nil { + return false, err + } + return exists, nil +} + +// StoreToControllerLister lists all controllers in the store. +// TODO: converge on the interface in pkg/client +func (s *StoreToControllerLister) List() (controllers []api.ReplicationController, err error) { + for _, c := range s.Store.List() { + controllers = append(controllers, *(c.(*api.ReplicationController))) + } + return controllers, nil +} + +// GetPodControllers returns a list of controllers managing a pod. Returns an error only if no matching controllers are found. +func (s *StoreToControllerLister) GetPodControllers(pod *api.Pod) (controllers []api.ReplicationController, err error) { + var selector labels.Selector + var rc api.ReplicationController + + if len(pod.Labels) == 0 { + err = fmt.Errorf("No controllers found for pod %v because it has no labels", pod.Name) + return + } + + for _, m := range s.Store.List() { + rc = *m.(*api.ReplicationController) + if rc.Namespace != pod.Namespace { + continue + } + labelSet := labels.Set(rc.Spec.Selector) + selector = labels.Set(rc.Spec.Selector).AsSelector() + + // If an rc with a nil or empty selector creeps in, it should match nothing, not everything. + if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(pod.Labels)) { + continue + } + controllers = append(controllers, rc) + } + if len(controllers) == 0 { + err = fmt.Errorf("Could not find controllers for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels) + } + return +} + // StoreToServiceLister makes a Store that has the List method of the client.ServiceInterface // The Store must contain (only) Services. type StoreToServiceLister struct { diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 5e57208559a..4891e03d476 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -45,6 +45,116 @@ func TestStoreToMinionLister(t *testing.T) { } } +func TestStoreToControllerLister(t *testing.T) { + store := NewStore(MetaNamespaceKeyFunc) + lister := StoreToControllerLister{store} + testCases := []struct { + inRCs []*api.ReplicationController + list func() ([]api.ReplicationController, error) + outRCNames util.StringSet + expectErr bool + }{ + // Basic listing with all labels and no selectors + { + inRCs: []*api.ReplicationController{ + {ObjectMeta: api.ObjectMeta{Name: "basic"}}, + }, + list: func() ([]api.ReplicationController, error) { + return lister.List() + }, + outRCNames: util.NewStringSet("basic"), + }, + // No pod lables + { + inRCs: []*api.ReplicationController{ + { + ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"foo": "baz"}, + }, + }, + }, + list: func() ([]api.ReplicationController, error) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"}, + } + return lister.GetPodControllers(pod) + }, + outRCNames: util.NewStringSet(), + expectErr: true, + }, + // No RC selectors + { + inRCs: []*api.ReplicationController{ + { + ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"}, + }, + }, + list: func() ([]api.ReplicationController, error) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod1", + Namespace: "ns", + Labels: map[string]string{"foo": "bar"}, + }, + } + return lister.GetPodControllers(pod) + }, + outRCNames: util.NewStringSet(), + expectErr: true, + }, + // Matching labels to selectors and namespace + { + inRCs: []*api.ReplicationController{ + { + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"foo": "bar"}, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"foo": "bar"}, + }, + }, + }, + list: func() ([]api.ReplicationController, error) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod1", + Labels: map[string]string{"foo": "bar"}, + Namespace: "ns", + }, + } + return lister.GetPodControllers(pod) + }, + outRCNames: util.NewStringSet("bar"), + }, + } + for _, c := range testCases { + for _, r := range c.inRCs { + store.Add(r) + } + + gotControllers, err := c.list() + if err != nil && c.expectErr { + continue + } else if c.expectErr { + t.Fatalf("Expected error, got none") + } else if err != nil { + t.Fatalf("Unexpected error %#v", err) + } + gotNames := make([]string, len(gotControllers)) + for ix := range gotControllers { + gotNames[ix] = gotControllers[ix].Name + } + if !c.outRCNames.HasAll(gotNames...) || len(gotNames) != len(c.outRCNames) { + t.Errorf("Unexpected got controllers %+v expected %+v", gotNames, c.outRCNames) + } + } +} + func TestStoreToPodLister(t *testing.T) { store := NewStore(MetaNamespaceKeyFunc) ids := []string{"foo", "bar", "baz"} diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go new file mode 100644 index 00000000000..7947cda8093 --- /dev/null +++ b/pkg/controller/controller_utils.go @@ -0,0 +1,291 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "encoding/json" + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" + "github.com/golang/glog" + "sync/atomic" + "time" +) + +const CreatedByAnnotation = "kubernetes.io/created-by" + +// Expectations are a way for replication controllers to tell the rc manager what they expect. eg: +// RCExpectations: { +// rc1: expects 2 adds in 2 minutes +// rc2: expects 2 dels in 2 minutes +// rc3: expects -1 adds in 2 minutes => rc3's expectations have already been met +// } +// +// Implementation: +// PodExpectation = pair of atomic counters to track pod creation/deletion +// RCExpectationsStore = TTLStore + a PodExpectation per rc +// +// * Once set expectations can only be lowered +// * An RC isn't synced till its expectations are either fulfilled, or expire +// * Rcs that don't set expectations will get woken up for every matching pod + +// expKeyFunc to parse out the key from a PodExpectation +var expKeyFunc = func(obj interface{}) (string, error) { + if e, ok := obj.(*PodExpectations); ok { + return e.key, nil + } + return "", fmt.Errorf("Could not find key for obj %#v", obj) +} + +// RCExpectations is a ttl cache mapping rcs to what they expect to see before being woken up for a sync. +type RCExpectations struct { + cache.Store +} + +// GetExpectations returns the PodExpectations of the given rc. +func (r *RCExpectations) GetExpectations(rc *api.ReplicationController) (*PodExpectations, bool, error) { + rcKey, err := rcKeyFunc(rc) + if err != nil { + return nil, false, err + } + if podExp, exists, err := r.GetByKey(rcKey); err == nil && exists { + return podExp.(*PodExpectations), true, nil + } else { + return nil, false, err + } +} + +// SatisfiedExpectations returns true if the replication manager has observed the required adds/dels +// for the given rc. Add/del counts are established by the rc at sync time, and updated as pods +// are observed by the replication manager's podController. +func (r *RCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bool { + if podExp, exists, err := r.GetExpectations(rc); exists { + if podExp.Fulfilled() { + return true + } else { + glog.V(4).Infof("Controller %v still waiting on expectations %#v", podExp) + return false + } + } else if err != nil { + glog.V(2).Infof("Error encountered while checking expectations %#v, forcing sync", err) + } else { + // When a new rc is created, it doesn't have expectations. + // When it doesn't see expected watch events for > TTL, the expectations expire. + // - In this case it wakes up, creates/deletes pods, and sets expectations again. + // When it has satisfied expectations and no pods need to be created/destroyed > TTL, the expectations expire. + // - In this case it continues without setting expectations till it needs to create/delete pods. + glog.V(4).Infof("Controller %v either never recorded expectations, or the ttl expired.", rc.Name) + } + // Trigger a sync if we either encountered and error (which shouldn't happen since we're + // getting from local store) or this rc hasn't established expectations. + return true +} + +// setExpectations registers new expectations for the given rc. Forgets existing expectations. +func (r *RCExpectations) setExpectations(rc *api.ReplicationController, add, del int) error { + rcKey, err := rcKeyFunc(rc) + if err != nil { + return err + } + return r.Add(&PodExpectations{add: int64(add), del: int64(del), key: rcKey}) +} + +func (r *RCExpectations) ExpectCreations(rc *api.ReplicationController, adds int) error { + return r.setExpectations(rc, adds, 0) +} + +func (r *RCExpectations) ExpectDeletions(rc *api.ReplicationController, dels int) error { + return r.setExpectations(rc, 0, dels) +} + +// Decrements the expectation counts of the given rc. +func (r *RCExpectations) lowerExpectations(rc *api.ReplicationController, add, del int) { + if podExp, exists, err := r.GetExpectations(rc); err == nil && exists { + if podExp.add > 0 && podExp.del > 0 { + glog.V(2).Infof("Controller has both add and del expectations %+v", podExp) + } + podExp.Seen(int64(add), int64(del)) + } +} + +// CreationObserved atomically decrements the `add` expecation count of the given replication controller. +func (r *RCExpectations) CreationObserved(rc *api.ReplicationController) { + r.lowerExpectations(rc, 1, 0) +} + +// DeletionObserved atomically decrements the `del` expectation count of the given replication controller. +func (r *RCExpectations) DeletionObserved(rc *api.ReplicationController) { + r.lowerExpectations(rc, 0, 1) +} + +// Expectations are either fulfilled, or expire naturally. +type Expectations interface { + Fulfilled() bool +} + +// PodExpectations track pod creates/deletes. +type PodExpectations struct { + add int64 + del int64 + key string +} + +// Seen decrements the add and del counters. +func (e *PodExpectations) Seen(add, del int64) { + atomic.AddInt64(&e.add, -add) + atomic.AddInt64(&e.del, -del) +} + +// Fulfilled returns true if this expectation has been fulfilled. +func (e *PodExpectations) Fulfilled() bool { + // TODO: think about why this line being atomic doesn't matter + return atomic.LoadInt64(&e.add) <= 0 && atomic.LoadInt64(&e.del) <= 0 +} + +// NewRCExpectations returns a store for PodExpectations. +func NewRCExpectations() *RCExpectations { + return &RCExpectations{cache.NewTTLStore(expKeyFunc, ExpectationsTimeout)} +} + +// PodControlInterface is an interface that knows how to add or delete pods +// created as an interface to allow testing. +type PodControlInterface interface { + // createReplica creates new replicated pods according to the spec. + createReplica(namespace string, controller *api.ReplicationController) error + // deletePod deletes the pod identified by podID. + deletePod(namespace string, podID string) error +} + +// RealPodControl is the default implementation of PodControllerInterface. +type RealPodControl struct { + kubeClient client.Interface + recorder record.EventRecorder +} + +func (r RealPodControl) createReplica(namespace string, controller *api.ReplicationController) error { + desiredLabels := make(labels.Set) + for k, v := range controller.Spec.Template.Labels { + desiredLabels[k] = v + } + desiredAnnotations := make(labels.Set) + for k, v := range controller.Spec.Template.Annotations { + desiredAnnotations[k] = v + } + + createdByRef, err := api.GetReference(controller) + if err != nil { + return fmt.Errorf("unable to get controller reference: %v", err) + } + // TODO: Version this serialization per #7322 + createdByRefJson, err := json.Marshal(createdByRef) + if err != nil { + return fmt.Errorf("unable to serialize controller reference: %v", err) + } + + desiredAnnotations[CreatedByAnnotation] = string(createdByRefJson) + + // use the dash (if the name isn't too long) to make the pod name a bit prettier + prefix := fmt.Sprintf("%s-", controller.Name) + if ok, _ := validation.ValidatePodName(prefix, true); !ok { + prefix = controller.Name + } + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Labels: desiredLabels, + Annotations: desiredAnnotations, + GenerateName: prefix, + }, + } + if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil { + return fmt.Errorf("unable to convert pod template: %v", err) + } + if labels.Set(pod.Labels).AsSelector().Empty() { + return fmt.Errorf("unable to create pod replica, no labels") + } + if newPod, err := r.kubeClient.Pods(namespace).Create(pod); err != nil { + r.recorder.Eventf(controller, "failedCreate", "Error creating: %v", err) + return fmt.Errorf("unable to create pod replica: %v", err) + } else { + glog.V(4).Infof("Controller %v created pod %v", controller.Name, newPod.Name) + r.recorder.Eventf(controller, "successfulCreate", "Created pod: %v", newPod.Name) + } + return nil +} + +func (r RealPodControl) deletePod(namespace, podID string) error { + return r.kubeClient.Pods(namespace).Delete(podID) +} + +// activePods type allows custom sorting of pods so an rc can pick the best ones to delete. +type activePods []*api.Pod + +func (s activePods) Len() int { return len(s) } +func (s activePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s activePods) Less(i, j int) bool { + // Unassigned < assigned + if s[i].Spec.Host == "" && s[j].Spec.Host != "" { + return true + } + // PodPending < PodUnknown < PodRunning + m := map[api.PodPhase]int{api.PodPending: 0, api.PodUnknown: 1, api.PodRunning: 2} + if m[s[i].Status.Phase] != m[s[j].Status.Phase] { + return m[s[i].Status.Phase] < m[s[j].Status.Phase] + } + // Not ready < ready + if !api.IsPodReady(s[i]) && api.IsPodReady(s[j]) { + return true + } + return false +} + +// filterActivePods returns pods that have not terminated. +func filterActivePods(pods []api.Pod) []*api.Pod { + var result []*api.Pod + for i := range pods { + if api.PodSucceeded != pods[i].Status.Phase && + api.PodFailed != pods[i].Status.Phase { + result = append(result, &pods[i]) + } + } + return result +} + +// updateReplicaCount attempts to update the Status.Replicas of the given controller, with retries. +// Note that the controller pointer might contain a more recent version of the same controller passed into the function. +func updateReplicaCount(rcClient client.ReplicationControllerInterface, controller *api.ReplicationController, numReplicas int) error { + return wait.Poll(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + if controller.Status.Replicas != numReplicas { + glog.V(4).Infof("Updating replica count for rc: %v, %d->%d", controller.Name, controller.Status.Replicas, numReplicas) + controller.Status.Replicas = numReplicas + _, err := rcClient.Update(controller) + if err != nil { + glog.V(2).Infof("Controller %v failed to update replica count: %v", controller.Name, err) + // Update the controller with the latest resource version for the next poll + controller, _ = rcClient.Get(controller.Name) + return false, err + } + } + return true, nil + }) +} diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 0fc6c2aad46..c747cd79ce5 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -17,107 +17,67 @@ limitations under the License. package controller import ( - "encoding/json" - "fmt" + "reflect" "sort" "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) +var ( + rcKeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc +) + +const ( + // We'll attempt to recompute the required replicas of all replication controllers + // the have fulfilled their expectations at least this often. + FullControllerResyncPeriod = 30 * time.Second + + // If a watch misdelivers info about a pod, it'll take this long + // to rectify the number of replicas. + PodRelistPeriod = 5 * time.Minute + + // If a watch drops an (add, delete) event for a pod, it'll take this long + // before a dormant rc waiting for those packets is woken up anyway. This + // should typically be somewhere between the PodRelistPeriod and the + // FullControllerResyncPeriod. It is specifically targeted at the case + // where some problem prevents an update of expectations, without it the + // RC could stay asleep forever. + ExpectationsTimeout = 2 * time.Minute +) + // ReplicationManager is responsible for synchronizing ReplicationController objects stored // in the system with actual running pods. type ReplicationManager struct { kubeClient client.Interface podControl PodControlInterface - syncTime <-chan time.Time // To allow injection of syncReplicationController for testing. - syncHandler func(controller api.ReplicationController) error -} - -// PodControlInterface is an interface that knows how to add or delete pods -// created as an interface to allow testing. -type PodControlInterface interface { - // createReplica creates new replicated pods according to the spec. - createReplica(namespace string, controller api.ReplicationController) - // deletePod deletes the pod identified by podID. - deletePod(namespace string, podID string) error -} - -// RealPodControl is the default implementation of PodControllerInterface. -type RealPodControl struct { - kubeClient client.Interface - recorder record.EventRecorder -} - -// Time period of main replication controller sync loop -const DefaultSyncPeriod = 5 * time.Second -const CreatedByAnnotation = "kubernetes.io/created-by" - -func (r RealPodControl) createReplica(namespace string, controller api.ReplicationController) { - desiredLabels := make(labels.Set) - for k, v := range controller.Spec.Template.Labels { - desiredLabels[k] = v - } - desiredAnnotations := make(labels.Set) - for k, v := range controller.Spec.Template.Annotations { - desiredAnnotations[k] = v - } - - createdByRef, err := api.GetReference(&controller) - if err != nil { - util.HandleError(fmt.Errorf("unable to get controller reference: %v", err)) - return - } - - createdByRefJson, err := json.Marshal(createdByRef) - if err != nil { - util.HandleError(fmt.Errorf("unable to serialize controller reference: %v", err)) - return - } - - desiredAnnotations[CreatedByAnnotation] = string(createdByRefJson) - - // use the dash (if the name isn't too long) to make the pod name a bit prettier - prefix := fmt.Sprintf("%s-", controller.Name) - if ok, _ := validation.ValidatePodName(prefix, true); !ok { - prefix = controller.Name - } - - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Labels: desiredLabels, - Annotations: desiredAnnotations, - GenerateName: prefix, - }, - } - if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil { - util.HandleError(fmt.Errorf("unable to convert pod template: %v", err)) - return - } - if labels.Set(pod.Labels).AsSelector().Empty() { - util.HandleError(fmt.Errorf("unable to create pod replica, no labels")) - return - } - if _, err := r.kubeClient.Pods(namespace).Create(pod); err != nil { - r.recorder.Eventf(&controller, "failedCreate", "Error creating: %v", err) - util.HandleError(fmt.Errorf("unable to create pod replica: %v", err)) - } -} - -func (r RealPodControl) deletePod(namespace, podID string) error { - return r.kubeClient.Pods(namespace).Delete(podID) + syncHandler func(rcKey string) error + // A TTLCache of pod creates/deletes each rc expects to see + expectations *RCExpectations + // A store of controllers, populated by the rcController + controllerStore cache.StoreToControllerLister + // A store of pods, populated by the podController + podStore cache.StoreToPodLister + // Watches changes to all replication controllers + rcController *framework.Controller + // Watches changes to all pods + podController *framework.Controller + // Controllers that need to be updated + queue *workqueue.Type } // NewReplicationManager creates a new ReplicationManager. @@ -131,181 +91,254 @@ func NewReplicationManager(kubeClient client.Interface) *ReplicationManager { kubeClient: kubeClient, recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), }, + expectations: NewRCExpectations(), + queue: workqueue.New(), } + rm.controllerStore.Store, rm.rcController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.ReplicationController{}, + FullControllerResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: rm.enqueueController, + UpdateFunc: func(old, cur interface{}) { + // We only really need to do this when spec changes, but for correctness it is safer to + // periodically double check. It is overkill for 2 reasons: + // 1. Status.Replica updates will cause a sync + // 2. Every 30s we will get a full resync (this will happen anyway every 5 minutes when pods relist) + // However, it shouldn't be that bad as rcs that haven't met expectations won't sync, and all + // the listing is done using local stores. + oldRC := old.(*api.ReplicationController) + curRC := cur.(*api.ReplicationController) + if oldRC.Status.Replicas != curRC.Status.Replicas { + glog.V(4).Infof("Observed updated replica count for rc: %v, %d->%d", curRC.Name, oldRC.Status.Replicas, curRC.Status.Replicas) + } + rm.enqueueController(cur) + }, + // This will enter the sync loop and no-op, becuase the controller has been deleted from the store. + // Note that deleting a controller immediately after resizing it to 0 will not work. The recommended + // way of achieving this is by performing a `stop` operation on the controller. + DeleteFunc: rm.enqueueController, + }, + ) + + rm.podStore.Store, rm.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return rm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return rm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + PodRelistPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: rm.addPod, + // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill + // the most frequent pod update is status, and the associated rc will only list from local storage, so + // it should be ok. + UpdateFunc: rm.updatePod, + DeleteFunc: rm.deletePod, + }, + ) + rm.syncHandler = rm.syncReplicationController return rm } // Run begins watching and syncing. -func (rm *ReplicationManager) Run(period time.Duration) { - rm.syncTime = time.Tick(period) - resourceVersion := "" - go util.Forever(func() { rm.watchControllers(&resourceVersion) }, period) +func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go rm.rcController.Run(stopCh) + go rm.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(rm.worker, time.Second, stopCh) + } + <-stopCh + rm.queue.ShutDown() } -// resourceVersion is a pointer to the resource version to use/update. -func (rm *ReplicationManager) watchControllers(resourceVersion *string) { - watching, err := rm.kubeClient.ReplicationControllers(api.NamespaceAll).Watch( - labels.Everything(), - fields.Everything(), - *resourceVersion, - ) +// getPodControllers returns the controller managing the given pod. +// TODO: Surface that we are ignoring multiple controllers for a single pod. +func (rm *ReplicationManager) getPodControllers(pod *api.Pod) *api.ReplicationController { + controllers, err := rm.controllerStore.GetPodControllers(pod) if err != nil { - util.HandleError(fmt.Errorf("unable to watch: %v", err)) - time.Sleep(5 * time.Second) + glog.V(4).Infof("No controllers found for pod %v, replication manager will avoid syncing", pod.Name) + return nil + } + return &controllers[0] +} + +// When a pod is created, enqueue the controller that manages it and update it's expectations. +func (rm *ReplicationManager) addPod(obj interface{}) { + pod := obj.(*api.Pod) + if rc := rm.getPodControllers(pod); rc != nil { + rm.expectations.CreationObserved(rc) + rm.enqueueController(rc) + } +} + +// When a pod is updated, figure out what controller/s manage it and wake them +// up. If the labels of the pod have changed we need to awaken both the old +// and new controller. old and cur must be *api.Pod types. +func (rm *ReplicationManager) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + // A periodic relist will send update events for all known pods. + return + } + // TODO: Write a unittest for this case + curPod := cur.(*api.Pod) + if rc := rm.getPodControllers(curPod); rc != nil { + rm.enqueueController(rc) + } + oldPod := old.(*api.Pod) + // Only need to get the old controller if the labels changed. + if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) { + // If the old and new rc are the same, the first one that syncs + // will set expectations preventing any damage from the second. + if oldRC := rm.getPodControllers(oldPod); oldRC != nil { + rm.enqueueController(oldRC) + } + } +} + +// When a pod is deleted, enqueue the controller that manages the pod and update its expectations. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (rm *ReplicationManager) deletePod(obj interface{}) { + if pod, ok := obj.(*api.Pod); ok { + if rc := rm.getPodControllers(pod); rc != nil { + rm.expectations.DeletionObserved(rc) + rm.enqueueController(rc) + } + return + } + podKey, err := framework.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return + } + // A periodic relist might not have a pod that the store has, in such cases we are sent a tombstone key. + // We don't know which controllers to sync, so just let the controller relist handle this. + glog.Infof("Pod %q was deleted but we don't have a record of its final state so it could take up to %v before a controller recreates a replica.", podKey, ExpectationsTimeout) +} + +// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown marker item. +func (rm *ReplicationManager) enqueueController(obj interface{}) { + key, err := rcKeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) return } + rm.queue.Add(key) +} + +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (rm *ReplicationManager) worker() { for { - select { - case <-rm.syncTime: - rm.synchronize() - case event, open := <-watching.ResultChan(): - if !open { - // watchChannel has been closed, or something else went - // wrong with our watch call. Let the util.Forever() - // that called us call us again. + func() { + key, quit := rm.queue.Get() + if quit { return } - if event.Type == watch.Error { - util.HandleError(fmt.Errorf("error from watch during sync: %v", errors.FromObject(event.Object))) - // Clear the resource version, this may cause us to skip some elements on the watch, - // but we'll catch them on the synchronize() call, so it works out. - *resourceVersion = "" - continue + defer rm.queue.Done(key) + err := rm.syncHandler(key.(string)) + if err != nil { + glog.Errorf("Error syncing replication controller: %v", err) } - glog.V(4).Infof("Got watch: %#v", event) - rc, ok := event.Object.(*api.ReplicationController) - if !ok { - if status, ok := event.Object.(*api.Status); ok { - if status.Status == api.StatusFailure { - glog.Errorf("Failed to watch: %v", status) - // Clear resource version here, as above, this won't hurt consistency, but we - // should consider introspecting more carefully here. (or make the apiserver smarter) - // "why not both?" - *resourceVersion = "" - continue - } - } - util.HandleError(fmt.Errorf("unexpected object: %#v", event.Object)) - continue - } - // If we get disconnected, start where we left off. - *resourceVersion = rc.ResourceVersion - // Sync even if this is a deletion event, to ensure that we leave - // it in the desired state. - glog.V(4).Infof("About to sync from watch: %q", rc.Name) - if err := rm.syncHandler(*rc); err != nil { - util.HandleError(fmt.Errorf("unexpected sync error: %v", err)) - } - } + }() } } -// filterActivePods returns pods that have not terminated. -func filterActivePods(pods []api.Pod) []*api.Pod { - var result []*api.Pod - for i := range pods { - if api.PodSucceeded != pods[i].Status.Phase && - api.PodFailed != pods[i].Status.Phase { - result = append(result, &pods[i]) - } - } - return result -} - -type activePods []*api.Pod - -func (s activePods) Len() int { return len(s) } -func (s activePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s activePods) Less(i, j int) bool { - // Unassigned < assigned - if s[i].Spec.Host == "" && s[j].Spec.Host != "" { - return true - } - // PodPending < PodUnknown < PodRunning - m := map[api.PodPhase]int{api.PodPending: 0, api.PodUnknown: 1, api.PodRunning: 2} - if m[s[i].Status.Phase] != m[s[j].Status.Phase] { - return m[s[i].Status.Phase] < m[s[j].Status.Phase] - } - // Not ready < ready - if !api.IsPodReady(s[i]) && api.IsPodReady(s[j]) { - return true - } - return false -} - -func (rm *ReplicationManager) syncReplicationController(controller api.ReplicationController) error { - s := labels.Set(controller.Spec.Selector).AsSelector() - podList, err := rm.kubeClient.Pods(controller.Namespace).List(s, fields.Everything()) - if err != nil { - return err - } - filteredList := filterActivePods(podList.Items) - numActivePods := len(filteredList) - diff := numActivePods - controller.Spec.Replicas +// manageReplicas checks and updates replicas for the given replication controller. +func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller *api.ReplicationController) { + diff := len(filteredPods) - controller.Spec.Replicas if diff < 0 { diff *= -1 + rm.expectations.ExpectCreations(controller, diff) wait := sync.WaitGroup{} wait.Add(diff) glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff) for i := 0; i < diff; i++ { go func() { defer wait.Done() - rm.podControl.createReplica(controller.Namespace, controller) + if err := rm.podControl.createReplica(controller.Namespace, controller); err != nil { + // Decrement the expected number of creates because the informer won't observe this pod + rm.expectations.CreationObserved(controller) + util.HandleError(err) + } }() } wait.Wait() } else if diff > 0 { + rm.expectations.ExpectDeletions(controller, diff) glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff) // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. - sort.Sort(activePods(filteredList)) + sort.Sort(activePods(filteredPods)) wait := sync.WaitGroup{} wait.Add(diff) for i := 0; i < diff; i++ { go func(ix int) { defer wait.Done() - rm.podControl.deletePod(controller.Namespace, filteredList[ix].Name) + if err := rm.podControl.deletePod(controller.Namespace, filteredPods[ix].Name); err != nil { + // Decrement the expected number of deletes because the informer won't observe this deletion + rm.expectations.DeletionObserved(controller) + } }(i) } wait.Wait() } - if controller.Status.Replicas != numActivePods { - controller.Status.Replicas = numActivePods - _, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller) - if err != nil { - return err - } +} + +// syncReplicationController will sync the rc with the given key if it has had its expectations fulfilled, meaning +// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked +// concurrently with the same key. +func (rm *ReplicationManager) syncReplicationController(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing controller %q (%v)", key, time.Now().Sub(startTime)) + }() + + obj, exists, err := rm.controllerStore.Store.GetByKey(key) + if !exists { + glog.Infof("Replication Controller has been deleted %v", key) + return nil + } + if err != nil { + glog.Infof("Unable to retrieve rc %v from store: %v", key, err) + rm.queue.Add(key) + return err + } + controller := *obj.(*api.ReplicationController) + + podList, err := rm.podStore.Pods(controller.Namespace).List(labels.Set(controller.Spec.Selector).AsSelector()) + if err != nil { + glog.Errorf("Error getting pods for rc %q: %v", key, err) + rm.queue.Add(key) + return err + } + // TODO: Do this in a single pass, or use an index. + filteredPods := filterActivePods(podList.Items) + + if rm.expectations.SatisfiedExpectations(&controller) { + rm.manageReplicas(filteredPods, &controller) + } + + // Always updates status as pods come up or die + if err := updateReplicaCount(rm.kubeClient.ReplicationControllers(controller.Namespace), &controller, len(filteredPods)); err != nil { + glog.V(2).Infof("Failed to update replica count for controller %v, will try on next sync", controller.Name) } return nil } - -func (rm *ReplicationManager) synchronize() { - // TODO: remove this method completely and rely on the watch. - // Add resource version tracking to watch to make this work. - var controllers []api.ReplicationController - list, err := rm.kubeClient.ReplicationControllers(api.NamespaceAll).List(labels.Everything()) - if err != nil { - util.HandleError(fmt.Errorf("synchronization error: %v", err)) - return - } - controllers = list.Items - wg := sync.WaitGroup{} - wg.Add(len(controllers)) - for ix := range controllers { - go func(ix int) { - defer wg.Done() - glog.V(4).Infof("periodic sync of %v", controllers[ix].Name) - err := rm.syncHandler(controllers[ix]) - if err != nil { - util.HandleError(fmt.Errorf("error synchronizing: %v", err)) - } - }(ix) - } - wg.Wait() -} diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 954dcf684eb..49b63b51a47 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -28,12 +28,15 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -41,31 +44,61 @@ type FakePodControl struct { controllerSpec []api.ReplicationController deletePodName []string lock sync.Mutex + err error } func init() { api.ForTesting_ReferencesAllowBlankSelfLinks = true } -func (f *FakePodControl) createReplica(namespace string, spec api.ReplicationController) { +func (f *FakePodControl) createReplica(namespace string, spec *api.ReplicationController) error { f.lock.Lock() defer f.lock.Unlock() - f.controllerSpec = append(f.controllerSpec, spec) + if f.err != nil { + return f.err + } + f.controllerSpec = append(f.controllerSpec, *spec) + return nil } func (f *FakePodControl) deletePod(namespace string, podName string) error { f.lock.Lock() defer f.lock.Unlock() + if f.err != nil { + return f.err + } f.deletePodName = append(f.deletePodName, podName) return nil } -func newReplicationController(replicas int) api.ReplicationController { - return api.ReplicationController{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: api.NamespaceDefault, ResourceVersion: "18"}, +func (f *FakePodControl) clear() { + f.lock.Lock() + defer f.lock.Unlock() + f.deletePodName = []string{} + f.controllerSpec = []api.ReplicationController{} +} + +func getKey(rc *api.ReplicationController, t *testing.T) string { + if key, err := rcKeyFunc(rc); err != nil { + t.Errorf("Unexpected error getting key for rc %v: %v", rc.Name, err) + return "" + } else { + return key + } +} + +func newReplicationController(replicas int) *api.ReplicationController { + rc := &api.ReplicationController{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, + ObjectMeta: api.ObjectMeta{ + UID: util.NewUUID(), + Name: "foobar", + Namespace: api.NamespaceDefault, + ResourceVersion: "18", + }, Spec: api.ReplicationControllerSpec{ Replicas: replicas, + Selector: map[string]string{"foo": "bar"}, Template: &api.PodTemplateSpec{ ObjectMeta: api.ObjectMeta{ Labels: map[string]string{ @@ -90,16 +123,25 @@ func newReplicationController(replicas int) api.ReplicationController { }, }, } + return rc } -func newPodList(count int) *api.PodList { +// create count pods with the given phase for the given rc (same selectors and namespace), and add them to the store. +func newPodList(store cache.Store, count int, status api.PodPhase, rc *api.ReplicationController) *api.PodList { pods := []api.Pod{} for i := 0; i < count; i++ { - pods = append(pods, api.Pod{ + newPod := api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("pod%d", i), + Name: fmt.Sprintf("pod%d", i), + Labels: rc.Spec.Selector, + Namespace: rc.Namespace, }, - }) + Status: api.PodStatus{Phase: status}, + } + if store != nil { + store.Add(&newPod) + } + pods = append(pods, newPod) } return &api.PodList{ Items: pods, @@ -156,74 +198,69 @@ func makeTestServer(t *testing.T, namespace, name string, podResponse, controlle return httptest.NewServer(mux), &fakeUpdateHandler } -func TestSyncReplicationControllerDoesNothing(t *testing.T) { - body, _ := latest.Codec.Encode(newPodList(2)) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), +func startManagerAndWait(manager *ReplicationManager, pods int, t *testing.T) chan struct{} { + stopCh := make(chan struct{}) + go manager.Run(1, stopCh) + err := wait.Poll(10*time.Millisecond, 100*time.Millisecond, func() (bool, error) { + podList, err := manager.podStore.List(labels.Everything()) + if err != nil { + return false, err + } + return len(podList) == pods, nil + }) + if err != nil { + t.Errorf("Failed to observe %d pods in 100ms", pods) } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + return stopCh +} +func TestSyncReplicationControllerDoesNothing(t *testing.T) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} - manager := NewReplicationManager(client) - manager.podControl = &fakePodControl + // 2 running pods, a controller with 2 replicas, sync is a no-op controllerSpec := newReplicationController(2) + manager.controllerStore.Store.Add(controllerSpec) + newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec) - manager.syncReplicationController(controllerSpec) + manager.podControl = &fakePodControl + manager.syncReplicationController(getKey(controllerSpec, t)) validateSyncReplication(t, &fakePodControl, 0, 0) } func TestSyncReplicationControllerDeletes(t *testing.T) { - body, _ := latest.Codec.Encode(newPodList(2)) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(body), - } - testServer := httptest.NewServer(&fakeHandler) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} - manager := NewReplicationManager(client) manager.podControl = &fakePodControl + // 2 running pods and a controller with 1 replica, one pod delete expected controllerSpec := newReplicationController(1) + manager.controllerStore.Store.Add(controllerSpec) + newPodList(manager.podStore.Store, 2, api.PodRunning, controllerSpec) - manager.syncReplicationController(controllerSpec) + manager.syncReplicationController(getKey(controllerSpec, t)) validateSyncReplication(t, &fakePodControl, 0, 1) } func TestSyncReplicationControllerCreates(t *testing.T) { - controller := newReplicationController(2) - testServer, fakeUpdateHandler := makeTestServer(t, api.NamespaceDefault, controller.Name, - serverResponse{http.StatusOK, newPodList(0)}, - serverResponse{http.StatusInternalServerError, &api.ReplicationControllerList{}}, - serverResponse{http.StatusOK, &controller}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) manager := NewReplicationManager(client) + + // A controller with 2 replicas and no pods in the store, 2 creates expected + controller := newReplicationController(2) + manager.controllerStore.Store.Add(controller) + fakePodControl := FakePodControl{} manager.podControl = &fakePodControl - manager.syncReplicationController(controller) + manager.syncReplicationController(getKey(controller, t)) validateSyncReplication(t, &fakePodControl, 2, 0) - - // No Status.Replicas update expected even though 2 pods were just created, - // because the controller manager can't observe the pods till the next sync cycle. - if fakeUpdateHandler.RequestReceived != nil { - t.Errorf("Unexpected updates for controller via %v", - fakeUpdateHandler.RequestReceived.URL) - } } func TestCreateReplica(t *testing.T) { ns := api.NamespaceDefault - body := runtime.EncodeOrDie(testapi.Codec(), &api.Pod{}) + body := runtime.EncodeOrDie(testapi.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "empty_pod"}}) fakeHandler := util.FakeHandler{ StatusCode: 200, ResponseBody: string(body), @@ -234,9 +271,12 @@ func TestCreateReplica(t *testing.T) { podControl := RealPodControl{ kubeClient: client, + recorder: &record.FakeRecorder{}, } controllerSpec := newReplicationController(1) + + // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template podControl.createReplica(ns, controllerSpec) manifest := api.ContainerManifest{} @@ -262,129 +302,72 @@ func TestCreateReplica(t *testing.T) { } } -func TestSynchronize(t *testing.T) { - controllerSpec1 := newReplicationController(4) - controllerSpec2 := newReplicationController(3) - controllerSpec2.Name = "bar" - controllerSpec2.Spec.Template.ObjectMeta.Labels = map[string]string{ - "name": "bar", - "type": "production", - } - - testServer, _ := makeTestServer(t, api.NamespaceDefault, "", - serverResponse{http.StatusOK, newPodList(0)}, - serverResponse{http.StatusOK, &api.ReplicationControllerList{ - Items: []api.ReplicationController{ - controllerSpec1, - controllerSpec2, - }}}, - serverResponse{http.StatusInternalServerError, &api.ReplicationController{}}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - manager := NewReplicationManager(client) - fakePodControl := FakePodControl{} - manager.podControl = &fakePodControl - - manager.synchronize() - - validateSyncReplication(t, &fakePodControl, 7, 0) -} - func TestControllerNoReplicaUpdate(t *testing.T) { - // Steady state for the replication controller, no Status.Replicas updates expected - rc := newReplicationController(5) - rc.Status = api.ReplicationControllerStatus{Replicas: 5} - activePods := 5 - - testServer, fakeUpdateHandler := makeTestServer(t, api.NamespaceDefault, rc.Name, - serverResponse{http.StatusOK, newPodList(activePods)}, - serverResponse{http.StatusOK, &api.ReplicationControllerList{ - Items: []api.ReplicationController{rc}, - }}, - serverResponse{http.StatusOK, &rc}) + // Setup a fake server to listen for requests, and run the rc manager in steady state + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) manager := NewReplicationManager(client) + + // Steady state for the replication controller, no Status.Replicas updates expected + activePods := 5 + rc := newReplicationController(activePods) + manager.controllerStore.Store.Add(rc) + rc.Status = api.ReplicationControllerStatus{Replicas: activePods} + newPodList(manager.podStore.Store, activePods, api.PodRunning, rc) + fakePodControl := FakePodControl{} manager.podControl = &fakePodControl - - manager.synchronize() + manager.syncReplicationController(getKey(rc, t)) validateSyncReplication(t, &fakePodControl, 0, 0) - if fakeUpdateHandler.RequestReceived != nil { - t.Errorf("Unexpected updates for controller via %v", - fakeUpdateHandler.RequestReceived.URL) + if fakeHandler.RequestReceived != nil { + t.Errorf("Unexpected update when pods and rcs are in a steady state") } } func TestControllerUpdateReplicas(t *testing.T) { + // This is a happy server just to record the PUT request we expect for status.Replicas + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + manager := NewReplicationManager(client) + // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. rc := newReplicationController(5) + manager.controllerStore.Store.Add(rc) rc.Status = api.ReplicationControllerStatus{Replicas: 2} - activePods := 4 + newPodList(manager.podStore.Store, 4, api.PodRunning, rc) + response := runtime.EncodeOrDie(testapi.Codec(), rc) + fakeHandler.ResponseBody = response - testServer, fakeUpdateHandler := makeTestServer(t, api.NamespaceDefault, rc.Name, - serverResponse{http.StatusOK, newPodList(activePods)}, - serverResponse{http.StatusOK, &api.ReplicationControllerList{ - Items: []api.ReplicationController{rc}, - }}, - serverResponse{http.StatusOK, &rc}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - manager := NewReplicationManager(client) fakePodControl := FakePodControl{} manager.podControl = &fakePodControl - manager.synchronize() + manager.syncReplicationController(getKey(rc, t)) // Status.Replicas should go up from 2->4 even though we created 5-4=1 pod rc.Status = api.ReplicationControllerStatus{Replicas: 4} - // These are set by default. - rc.Spec.Selector = rc.Spec.Template.Labels - rc.Labels = rc.Spec.Template.Labels - decRc := runtime.EncodeOrDie(testapi.Codec(), &rc) - fakeUpdateHandler.ValidateRequest(t, testapi.ResourcePathWithNamespaceQuery(replicationControllerResourceName(), rc.Namespace, rc.Name), "PUT", &decRc) + decRc := runtime.EncodeOrDie(testapi.Codec(), rc) + fakeHandler.ValidateRequest(t, testapi.ResourcePathWithNamespaceQuery(replicationControllerResourceName(), rc.Namespace, rc.Name), "PUT", &decRc) validateSyncReplication(t, &fakePodControl, 1, 0) } -type FakeWatcher struct { - w *watch.FakeWatcher - *testclient.Fake -} - -func TestWatchControllers(t *testing.T) { - fakeWatch := watch.NewFake() - client := &testclient.Fake{Watch: fakeWatch} - manager := NewReplicationManager(client) - var testControllerSpec api.ReplicationController - received := make(chan struct{}) - manager.syncHandler = func(controllerSpec api.ReplicationController) error { - if !api.Semantic.DeepDerivative(controllerSpec, testControllerSpec) { - t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) - } - close(received) - return nil - } - - resourceVersion := "" - go manager.watchControllers(&resourceVersion) - - // Test normal case - testControllerSpec.Name = "foo" - - fakeWatch.Add(&testControllerSpec) - - select { - case <-received: - case <-time.After(100 * time.Millisecond): - t.Errorf("Expected 1 call but got 0") - } -} - func TestActivePodFiltering(t *testing.T) { - podList := newPodList(5) + // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. + rc := newReplicationController(0) + podList := newPodList(nil, 5, api.PodRunning, rc) podList.Items[0].Status.Phase = api.PodSucceeded podList.Items[1].Status.Phase = api.PodFailed expectedNames := util.NewStringSet() @@ -404,7 +387,10 @@ func TestActivePodFiltering(t *testing.T) { func TestSortingActivePods(t *testing.T) { numPods := 5 - podList := newPodList(numPods) + // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. + rc := newReplicationController(0) + podList := newPodList(nil, numPods, api.PodRunning, rc) + pods := make([]*api.Pod, len(podList.Items)) for i := range podList.Items { pods[i] = &podList.Items[i] @@ -450,3 +436,326 @@ func TestSortingActivePods(t *testing.T) { } } } + +// NewFakeRCExpectationsLookup creates a fake store for PodExpectations. +func NewFakeRCExpectationsLookup(ttl time.Duration) (*RCExpectations, *util.FakeClock) { + fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + fakeClock := &util.FakeClock{fakeTime} + ttlPolicy := &cache.TTLPolicy{ttl, fakeClock} + ttlStore := cache.NewFakeExpirationStore( + expKeyFunc, nil, ttlPolicy, fakeClock) + return &RCExpectations{ttlStore}, fakeClock +} + +func TestRCExpectations(t *testing.T) { + ttl := 30 * time.Second + e, fakeClock := NewFakeRCExpectationsLookup(ttl) + // In practice we can't really have add and delete expectations since we only either create or + // delete replicas in one rc pass, and the rc goes to sleep soon after until the expectations are + // either fulfilled or timeout. + adds, dels := 10, 30 + rc := newReplicationController(1) + + // RC fires off adds and deletes at apiserver, then sets expectations + e.setExpectations(rc, adds, dels) + var wg sync.WaitGroup + for i := 0; i < adds+1; i++ { + wg.Add(1) + go func() { + // In prod this can happen either because of a failed create by the rc + // or after having observed a create via informer + e.CreationObserved(rc) + wg.Done() + }() + } + wg.Wait() + + // There are still delete expectations + if e.SatisfiedExpectations(rc) { + t.Errorf("Rc will sync before expectations are met") + } + for i := 0; i < dels+1; i++ { + wg.Add(1) + go func() { + e.DeletionObserved(rc) + wg.Done() + }() + } + wg.Wait() + + // Expectations have been surpassed + if podExp, exists, err := e.GetExpectations(rc); err == nil && exists { + if podExp.add != -1 || podExp.del != -1 { + t.Errorf("Unexpected pod expectations %#v", podExp) + } + } else { + t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err) + } + if !e.SatisfiedExpectations(rc) { + t.Errorf("Expectations are met but the rc will not sync") + } + + // Next round of rc sync, old expectations are cleared + e.setExpectations(rc, 1, 2) + if podExp, exists, err := e.GetExpectations(rc); err == nil && exists { + if podExp.add != 1 || podExp.del != 2 { + t.Errorf("Unexpected pod expectations %#v", podExp) + } + } else { + t.Errorf("Could not get expectations for rc, exists %v and err %v", exists, err) + } + + // Expectations have expired because of ttl + fakeClock.Time = fakeClock.Time.Add(ttl + 1) + if !e.SatisfiedExpectations(rc) { + t.Errorf("Expectations should have expired but didn't") + } +} + +func TestSyncReplicationControllerDormancy(t *testing.T) { + // Setup a test server so we can lie about the current state of pods + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + + fakePodControl := FakePodControl{} + manager := NewReplicationManager(client) + manager.podControl = &fakePodControl + + controllerSpec := newReplicationController(2) + manager.controllerStore.Store.Add(controllerSpec) + newPodList(manager.podStore.Store, 1, api.PodRunning, controllerSpec) + + // Creates a replica and sets expectations + controllerSpec.Status.Replicas = 1 + manager.syncReplicationController(getKey(controllerSpec, t)) + validateSyncReplication(t, &fakePodControl, 1, 0) + + // Expectations prevents replicas but not an update on status + controllerSpec.Status.Replicas = 0 + fakePodControl.clear() + manager.syncReplicationController(getKey(controllerSpec, t)) + validateSyncReplication(t, &fakePodControl, 0, 0) + + // Lowering expectations should lead to a sync that creates a replica, however the + // fakePodControl error will prevent this, leaving expectations at 0, 0 + manager.expectations.CreationObserved(controllerSpec) + controllerSpec.Status.Replicas = 1 + fakePodControl.clear() + fakePodControl.err = fmt.Errorf("Fake Error") + + manager.syncReplicationController(getKey(controllerSpec, t)) + validateSyncReplication(t, &fakePodControl, 0, 0) + + // This replica should not need a Lowering of expectations, since the previous create failed + fakePodControl.err = nil + manager.syncReplicationController(getKey(controllerSpec, t)) + validateSyncReplication(t, &fakePodControl, 1, 0) + + // 1 PUT for the rc status during dormancy window. + // Note that the pod creates go through pod control so they're not recorded. + fakeHandler.ValidateRequestCount(t, 1) +} + +func TestPodControllerLookup(t *testing.T) { + manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})) + testCases := []struct { + inRCs []*api.ReplicationController + pod *api.Pod + outRCName string + }{ + // pods without labels don't match any rcs + { + inRCs: []*api.ReplicationController{ + {ObjectMeta: api.ObjectMeta{Name: "basic"}}}, + pod: &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo1", Namespace: api.NamespaceAll}}, + outRCName: "", + }, + // Matching labels, not namespace + { + inRCs: []*api.ReplicationController{ + { + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"foo": "bar"}, + }, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo2", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, + outRCName: "", + }, + // Matching ns and labels returns the key to the rc, not the rc name + { + inRCs: []*api.ReplicationController{ + { + ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"}, + Spec: api.ReplicationControllerSpec{ + Selector: map[string]string{"foo": "bar"}, + }, + }, + }, + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo3", Namespace: "ns", Labels: map[string]string{"foo": "bar"}}}, + outRCName: "bar", + }, + } + for _, c := range testCases { + for _, r := range c.inRCs { + manager.controllerStore.Add(r) + } + if rc := manager.getPodControllers(c.pod); rc != nil { + if c.outRCName != rc.Name { + t.Errorf("Got controller %+v expected %+v", rc.Name, c.outRCName) + } + } else if c.outRCName != "" { + t.Errorf("Expected a controller %v pod %v, found none", c.outRCName, c.pod.Name) + } + } +} + +type FakeWatcher struct { + w *watch.FakeWatcher + *testclient.Fake +} + +func TestWatchControllers(t *testing.T) { + fakeWatch := watch.NewFake() + client := &testclient.Fake{Watch: fakeWatch} + manager := NewReplicationManager(client) + + var testControllerSpec api.ReplicationController + received := make(chan string) + + // The update sent through the fakeWatcher should make its way into the workqueue, + // and eventually into the syncHandler. The handler validates the received controller + // and closes the received channel to indicate that the test can finish. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.controllerStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find controller under key %v", key) + } + controllerSpec := *obj.(*api.ReplicationController) + if !api.Semantic.DeepDerivative(controllerSpec, testControllerSpec) { + t.Errorf("Expected %#v, but got %#v", testControllerSpec, controllerSpec) + } + close(received) + return nil + } + // Start only the rc watcher and the workqueue, send a watch event, + // and make sure it hits the sync method. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.rcController.Run(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + testControllerSpec.Name = "foo" + fakeWatch.Add(&testControllerSpec) + + select { + case <-received: + case <-time.After(100 * time.Millisecond): + t.Errorf("Expected 1 call but got 0") + } +} + +func TestWatchPods(t *testing.T) { + fakeWatch := watch.NewFake() + client := &testclient.Fake{Watch: fakeWatch} + manager := NewReplicationManager(client) + + // Put one rc and one pod into the controller's stores + testControllerSpec := newReplicationController(1) + manager.controllerStore.Store.Add(testControllerSpec) + received := make(chan string) + // The pod update sent through the fakeWatcher should figure out the managing rc and + // send it into the syncHandler. + manager.syncHandler = func(key string) error { + + obj, exists, err := manager.controllerStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find controller under key %v", key) + } + controllerSpec := obj.(*api.ReplicationController) + if !api.Semantic.DeepDerivative(controllerSpec, testControllerSpec) { + t.Errorf("\nExpected %#v,\nbut got %#v", testControllerSpec, controllerSpec) + } + close(received) + return nil + } + // Start only the pod watcher and the workqueue, send a watch event, + // and make sure it hits the sync method for the right rc. + stopCh := make(chan struct{}) + defer close(stopCh) + go manager.podController.Run(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + pods := newPodList(nil, 1, api.PodRunning, testControllerSpec) + testPod := pods.Items[0] + testPod.Status.Phase = api.PodFailed + fakeWatch.Add(&testPod) + + select { + case <-received: + case <-time.After(100 * time.Millisecond): + t.Errorf("Expected 1 call but got 0") + } +} + +func TestUpdatePods(t *testing.T) { + fakeWatch := watch.NewFake() + client := &testclient.Fake{Watch: fakeWatch} + manager := NewReplicationManager(client) + + received := make(chan string) + + manager.syncHandler = func(key string) error { + obj, exists, err := manager.controllerStore.Store.GetByKey(key) + if !exists || err != nil { + t.Errorf("Expected to find controller under key %v", key) + } + received <- obj.(*api.ReplicationController).Name + return nil + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go util.Until(manager.worker, 10*time.Millisecond, stopCh) + + // Put 2 rcs and one pod into the controller's stores + testControllerSpec1 := newReplicationController(1) + manager.controllerStore.Store.Add(testControllerSpec1) + testControllerSpec2 := *testControllerSpec1 + testControllerSpec2.Spec.Selector = map[string]string{"bar": "foo"} + testControllerSpec2.Name = "barfoo" + manager.controllerStore.Store.Add(&testControllerSpec2) + + // Put one pod in the podStore + pod1 := newPodList(manager.podStore.Store, 1, api.PodRunning, testControllerSpec1).Items[0] + pod2 := pod1 + pod2.Labels = testControllerSpec2.Spec.Selector + + // Send an update of the same pod with modified labels, and confirm we get a sync request for + // both controllers + manager.updatePod(&pod1, &pod2) + + expected := util.NewStringSet(testControllerSpec1.Name, testControllerSpec2.Name) + for _, name := range expected.List() { + t.Logf("Expecting update for %+v", name) + select { + case got := <-received: + if !expected.Has(got) { + t.Errorf("Expected keys %#v got %v", expected, got) + } + case <-time.After(100 * time.Millisecond): + t.Errorf("Expected update notifications for controllers within 100ms each") + } + } +} diff --git a/pkg/kubectl/cmd/resize.go b/pkg/kubectl/cmd/resize.go index ec1cbdced31..8e8d32fa81c 100644 --- a/pkg/kubectl/cmd/resize.go +++ b/pkg/kubectl/cmd/resize.go @@ -23,7 +23,6 @@ import ( "github.com/spf13/cobra" - "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" cmdutil "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/cmd/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl/resource" @@ -43,7 +42,7 @@ $ kubectl resize --replicas=3 replicationcontrollers foo // If the replication controller named foo's current size is 2, resize foo to 3. $ kubectl resize --current-replicas=2 --replicas=3 replicationcontrollers foo` - retryFrequency = controller.DefaultSyncPeriod / 100 + retryFrequency = 100 * time.Millisecond retryTimeout = 10 * time.Second ) diff --git a/pkg/labels/selector.go b/pkg/labels/selector.go index a6a8fe4bfe8..f84758aa74f 100644 --- a/pkg/labels/selector.go +++ b/pkg/labels/selector.go @@ -665,7 +665,7 @@ func validateLabelValue(v string) error { } // SelectorFromSet returns a Selector which will match exactly the given Set. A -// nil Set is considered equivalent to Everything(). +// nil and empty Sets are considered equivalent to Everything(). func SelectorFromSet(ls Set) Selector { if ls == nil { return LabelSelector{} diff --git a/pkg/labels/selector_test.go b/pkg/labels/selector_test.go index 985ebb31169..152cca892a0 100644 --- a/pkg/labels/selector_test.go +++ b/pkg/labels/selector_test.go @@ -99,6 +99,7 @@ func TestSelectorMatches(t *testing.T) { expectMatch(t, "x=y,z=w", Set{"x": "y", "z": "w"}) expectMatch(t, "x!=y,z!=w", Set{"x": "z", "z": "a"}) expectMatch(t, "notin=in", Set{"notin": "in"}) // in and notin in exactMatch + expectNoMatch(t, "x=z", Set{}) expectNoMatch(t, "x=y", Set{"x": "z"}) expectNoMatch(t, "x=y,z=w", Set{"x": "w", "z": "w"}) expectNoMatch(t, "x!=y,z!=w", Set{"x": "z", "z": "w"})