diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index ec4f6d82ebe..58024abf1a3 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -53,6 +53,7 @@ import ( persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume" "k8s.io/kubernetes/pkg/controller/podautoscaler" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" + replicaset "k8s.io/kubernetes/pkg/controller/replicaset" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" routecontroller "k8s.io/kubernetes/pkg/controller/route" @@ -278,6 +279,12 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)). Run(s.ConcurrentDeploymentSyncs, util.NeverStop) } + + if containsResource(resources, "replicasets") { + glog.Infof("Starting ReplicaSet controller") + go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas). + Run(s.ConcurrentRSSyncs, util.NeverStop) + } } volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfigFlags) diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 3f0679b99de..3fad72cb7b6 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -39,6 +39,7 @@ type CMServer struct { CloudConfigFile string ConcurrentEndpointSyncs int ConcurrentRCSyncs int + ConcurrentRSSyncs int ConcurrentDSCSyncs int ConcurrentJobSyncs int ConcurrentResourceQuotaSyncs int @@ -99,6 +100,7 @@ func NewCMServer() *CMServer { Address: net.ParseIP("0.0.0.0"), ConcurrentEndpointSyncs: 5, ConcurrentRCSyncs: 5, + ConcurrentRSSyncs: 5, ConcurrentDSCSyncs: 2, ConcurrentJobSyncs: 5, ConcurrentResourceQuotaSyncs: 5, @@ -142,6 +144,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load") fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers") diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md index a81419f536d..3f92655dcc2 100644 --- a/docs/admin/kube-controller-manager.md +++ b/docs/admin/kube-controller-manager.md @@ -63,6 +63,7 @@ kube-controller-manager --cluster-name="kubernetes": The instance prefix for the cluster --concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load --concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load + --concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load --concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load --concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load --deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter. diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index e6d4c652b11..d3afecf3d7c 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -47,6 +47,7 @@ cluster-name cluster-tag concurrent-deployment-syncs concurrent-endpoint-syncs +concurrent-replicaset-syncs concurrent-resource-quota-syncs config-sync-period configure-cbr0 diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index c21b0d4aae9..f3db7709619 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -234,6 +234,12 @@ func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIV return estimate, err } } + if containsResource(resources, "replicasets") { + err = deleteReplicaSets(kubeClient.Extensions(), namespace) + if err != nil { + return estimate, err + } + } } return estimate, nil } @@ -538,6 +544,20 @@ func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns stri return nil } +func deleteReplicaSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error { + items, err := expClient.ReplicaSets(ns).List(api.ListOptions{}) + if err != nil { + return err + } + for i := range items.Items { + err := expClient.ReplicaSets(ns).Delete(items.Items[i].Name, nil) + if err != nil && !errors.IsNotFound(err) { + return err + } + } + return nil +} + // TODO: this is duplicated logic. Move it somewhere central? func containsVersion(versions *unversioned.APIVersions, version string) bool { for ix := range versions.Versions { diff --git a/pkg/controller/replicaset/options/options.go b/pkg/controller/replicaset/options/options.go new file mode 100644 index 00000000000..91951a54927 --- /dev/null +++ b/pkg/controller/replicaset/options/options.go @@ -0,0 +1,35 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "github.com/spf13/pflag" +) + +type ReplicasetControllerOptions struct { + ConcurrentRSSyncs int +} + +func NewReplicasetControllerOptions() ReplicasetControllerOptions { + return ReplicasetControllerOptions{ + ConcurrentRSSyncs: 5, + } +} + +func (o *ReplicasetControllerOptions) AddFlags(fs *pflag.FlagSet) { + fs.IntVar(&o.ConcurrentRSSyncs, "concurrent-replicaset-syncs", o.ConcurrentRSSyncs, "The number of replicasets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") +} diff --git a/pkg/kubectl/kubectl.go b/pkg/kubectl/kubectl.go index c7ce1bf6a0e..aa3db5ed082 100644 --- a/pkg/kubectl/kubectl.go +++ b/pkg/kubectl/kubectl.go @@ -118,6 +118,7 @@ func expandResourceShortcut(resource unversioned.GroupVersionResource) unversion "pv": api.SchemeGroupVersion.WithResource("persistentvolumes"), "quota": api.SchemeGroupVersion.WithResource("resourcequotas"), "rc": api.SchemeGroupVersion.WithResource("replicationcontrollers"), + "rs": extensions.SchemeGroupVersion.WithResource("replicasets"), "svc": api.SchemeGroupVersion.WithResource("services"), } if expanded, ok := shortForms[resource.Resource]; ok { diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 80f07df9201..44214cecb8e 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -44,6 +44,8 @@ func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) { switch kind { case api.Kind("ReplicationController"): return &ReplicationControllerScaler{c}, nil + case extensions.Kind("ReplicaSet"): + return &ReplicaSetScaler{c.Extensions()}, nil case extensions.Kind("Job"): return &JobScaler{c.Extensions()}, nil case extensions.Kind("Deployment"): @@ -186,6 +188,68 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize return nil } +// ValidateReplicaSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise +func (precondition *ScalePrecondition) ValidateReplicaSet(replicaSet *extensions.ReplicaSet) error { + if precondition.Size != -1 && replicaSet.Spec.Replicas != precondition.Size { + return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(replicaSet.Spec.Replicas)} + } + if len(precondition.ResourceVersion) != 0 && replicaSet.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, replicaSet.ResourceVersion} + } + return nil +} + +type ReplicaSetScaler struct { + c client.ExtensionsInterface +} + +func (scaler *ReplicaSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { + rs, err := scaler.c.ReplicaSets(namespace).Get(name) + if err != nil { + return ScaleError{ScaleGetFailure, "Unknown", err} + } + if preconditions != nil { + if err := preconditions.ValidateReplicaSet(rs); err != nil { + return err + } + } + rs.Spec.Replicas = int(newSize) + // TODO: do retry on 409 errors here? + if _, err := scaler.c.ReplicaSets(namespace).Update(rs); err != nil { + if errors.IsInvalid(err) { + return ScaleError{ScaleUpdateInvalidFailure, rs.ResourceVersion, err} + } + return ScaleError{ScaleUpdateFailure, rs.ResourceVersion, err} + } + return nil +} + +// Scale updates a ReplicaSet to a new size, with optional precondition check (if preconditions is +// not nil), optional retries (if retry is not nil), and then optionally waits for it's replica +// count to reach the new value (if wait is not nil). +func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // Make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + rs, err := scaler.c.ReplicaSets(namespace).Get(name) + if err != nil { + return err + } + return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, + client.ReplicaSetHasDesiredReplicas(scaler.c, rs)) + } + return nil +} + // ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise. func (precondition *ScalePrecondition) ValidateJob(job *extensions.Job) error { if precondition.Size != -1 && job.Spec.Parallelism == nil { diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index 9f4a406e20e..a430089b878 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -63,6 +63,9 @@ func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) { case api.Kind("ReplicationController"): return &ReplicationControllerReaper{c, Interval, Timeout}, nil + case extensions.Kind("ReplicaSet"): + return &ReplicaSetReaper{c, Interval, Timeout}, nil + case extensions.Kind("DaemonSet"): return &DaemonSetReaper{c, Interval, Timeout}, nil @@ -87,6 +90,10 @@ type ReplicationControllerReaper struct { client.Interface pollInterval, timeout time.Duration } +type ReplicaSetReaper struct { + client.Interface + pollInterval, timeout time.Duration +} type DaemonSetReaper struct { client.Interface pollInterval, timeout time.Duration @@ -190,6 +197,81 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout return nil } +// TODO(madhusudancs): Implement it when controllerRef is implemented - https://github.com/kubernetes/kubernetes/issues/2210 +// getOverlappingReplicaSets finds ReplicaSets that this ReplicaSet overlaps, as well as ReplicaSets overlapping this ReplicaSet. +func getOverlappingReplicaSets(c client.ReplicaSetInterface, rs *extensions.ReplicaSet) ([]extensions.ReplicaSet, []extensions.ReplicaSet, error) { + var overlappingRSs, exactMatchRSs []extensions.ReplicaSet + return overlappingRSs, exactMatchRSs, nil +} + +func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { + rsc := reaper.Extensions().ReplicaSets(namespace) + scaler, err := ScalerFor(extensions.Kind("ReplicaSet"), *reaper) + if err != nil { + return err + } + rs, err := rsc.Get(name) + if err != nil { + return err + } + if timeout == 0 { + timeout = Timeout + time.Duration(10*rs.Spec.Replicas)*time.Second + } + + // The ReplicaSet controller will try and detect all matching ReplicaSets + // for a pod's labels, and only sync the oldest one. This means if we have + // a pod with labels [(k1: v1), (k2: v2)] and two ReplicaSets: rs1 with + // selector [(k1=v1)], and rs2 with selector [(k1=v1),(k2=v2)], the + // ReplicaSet controller will sync the older of the two ReplicaSets. + // + // If there are ReplicaSets with a superset of labels, eg: + // deleting: (k1=v1), superset: (k2=v2, k1=v1) + // - It isn't safe to delete the ReplicaSet because there could be a pod + // with labels (k1=v1) that isn't managed by the superset ReplicaSet. + // We can't scale it down either, because there could be a pod + // (k2=v2, k1=v1) that it deletes causing a fight with the superset + // ReplicaSet. + // If there are ReplicaSets with a subset of labels, eg: + // deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3) + // - Even if it's safe to delete this ReplicaSet without a scale down because + // all it's pods are being controlled by the subset ReplicaSet the code + // returns an error. + + // In theory, creating overlapping ReplicaSets is user error, so the loop below + // tries to account for this logic only in the common case, where we end up + // with multiple ReplicaSets that have an exact match on selectors. + + // TODO(madhusudancs): Re-evaluate again when controllerRef is implemented - + // https://github.com/kubernetes/kubernetes/issues/2210 + overlappingRSs, exactMatchRSs, err := getOverlappingReplicaSets(rsc, rs) + if err != nil { + return fmt.Errorf("error getting ReplicaSets: %v", err) + } + + if len(overlappingRSs) > 0 { + var names []string + for _, overlappingRS := range overlappingRSs { + names = append(names, overlappingRS.Name) + } + return fmt.Errorf( + "Detected overlapping ReplicaSets for ReplicaSet %v: %v, please manage deletion individually with --cascade=false.", + rs.Name, strings.Join(names, ",")) + } + if len(exactMatchRSs) == 0 { + // No overlapping ReplicaSets. + retry := NewRetryParams(reaper.pollInterval, reaper.timeout) + waitForReplicas := NewRetryParams(reaper.pollInterval, timeout) + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil { + return err + } + } + + if err := rsc.Delete(name, gracePeriod); err != nil { + return err + } + return nil +} + func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { ds, err := reaper.Extensions().DaemonSets(namespace).Get(name) if err != nil { diff --git a/pkg/kubectl/stop_test.go b/pkg/kubectl/stop_test.go index 7fbb53b1ddc..5540815b347 100644 --- a/pkg/kubectl/stop_test.go +++ b/pkg/kubectl/stop_test.go @@ -264,6 +264,116 @@ func TestReplicationControllerStop(t *testing.T) { } } +func TestReplicaSetStop(t *testing.T) { + name := "foo" + ns := "default" + tests := []struct { + Name string + Objs []runtime.Object + StopError error + ExpectedActions []string + }{ + { + Name: "OnlyOneRS", + Objs: []runtime.Object{ + &extensions.ReplicaSet{ // GET + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + &extensions.ReplicaSetList{ // LIST + Items: []extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + }, + }, + }, + StopError: nil, + ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"}, + }, + { + Name: "NoOverlapping", + Objs: []runtime.Object{ + &extensions.ReplicaSet{ // GET + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + &extensions.ReplicaSetList{ // LIST + Items: []extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{ + Name: "baz", + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k3": "v3"}}, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + }, + }, + }, + StopError: nil, + ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"}, + }, + // TODO: Implement tests for overlapping replica sets, similar to replication controllers, + // when the overlapping checks are implemented for replica sets. + } + + for _, test := range tests { + fake := testclient.NewSimpleFake(test.Objs...) + reaper := ReplicaSetReaper{fake, time.Millisecond, time.Millisecond} + err := reaper.Stop(ns, name, 0, nil) + if !reflect.DeepEqual(err, test.StopError) { + t.Errorf("%s unexpected error: %v", test.Name, err) + continue + } + + actions := fake.Actions() + if len(actions) != len(test.ExpectedActions) { + t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions)) + continue + } + for i, verb := range test.ExpectedActions { + if actions[i].GetResource() != "replicasets" { + t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb) + } + if actions[i].GetVerb() != verb { + t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb) + } + } + } +} + func TestJobStop(t *testing.T) { name := "foo" ns := "default" diff --git a/pkg/master/master.go b/pkg/master/master.go index 70a8a3ac848..ed78f246e67 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -59,6 +59,7 @@ import ( podetcd "k8s.io/kubernetes/pkg/registry/pod/etcd" pspetcd "k8s.io/kubernetes/pkg/registry/podsecuritypolicy/etcd" podtemplateetcd "k8s.io/kubernetes/pkg/registry/podtemplate/etcd" + replicasetetcd "k8s.io/kubernetes/pkg/registry/replicaset/etcd" resourcequotaetcd "k8s.io/kubernetes/pkg/registry/resourcequota/etcd" secretetcd "k8s.io/kubernetes/pkg/registry/secret/etcd" "k8s.io/kubernetes/pkg/registry/service" @@ -640,6 +641,11 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { podSecurityPolicyStorage := pspetcd.NewREST(dbClient("podsecuritypolicy"), storageDecorator) storage["podSecurityPolicies"] = podSecurityPolicyStorage } + if isEnabled("replicasets") { + replicaSetStorage := replicasetetcd.NewStorage(dbClient("replicasets"), storageDecorator) + storage["replicasets"] = replicaSetStorage.ReplicaSet + storage["replicasets/status"] = replicaSetStorage.Status + } return storage }