From 4f9b8b2bfc8c2d64b921a19f0e19d025c5cc33d3 Mon Sep 17 00:00:00 2001 From: "Madhusudan.C.S" Date: Mon, 18 Jan 2016 16:06:57 -0800 Subject: [PATCH] Link all the ReplicaSet controller boilerplate together. 1. Enable replica set in controller manager. 2. Enable replica set etcd storage in master package. 3. Add replica set support to kubectl commands. --- .../app/controllermanager.go | 7 ++ .../app/options/options.go | 3 + docs/admin/kube-controller-manager.md | 1 + hack/verify-flags/known-flags.txt | 1 + .../namespace/namespace_controller.go | 20 ++++ pkg/controller/replicaset/options/options.go | 35 ++++++ pkg/kubectl/kubectl.go | 1 + pkg/kubectl/scale.go | 64 ++++++++++ pkg/kubectl/stop.go | 82 +++++++++++++ pkg/kubectl/stop_test.go | 110 ++++++++++++++++++ pkg/master/master.go | 6 + 11 files changed, 330 insertions(+) create mode 100644 pkg/controller/replicaset/options/options.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index ec4f6d82ebe..58024abf1a3 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -53,6 +53,7 @@ import ( persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume" "k8s.io/kubernetes/pkg/controller/podautoscaler" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" + replicaset "k8s.io/kubernetes/pkg/controller/replicaset" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" routecontroller "k8s.io/kubernetes/pkg/controller/route" @@ -278,6 +279,12 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)). Run(s.ConcurrentDeploymentSyncs, util.NeverStop) } + + if containsResource(resources, "replicasets") { + glog.Infof("Starting ReplicaSet controller") + go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas). + Run(s.ConcurrentRSSyncs, util.NeverStop) + } } volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfigFlags) diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 3f0679b99de..3fad72cb7b6 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -39,6 +39,7 @@ type CMServer struct { CloudConfigFile string ConcurrentEndpointSyncs int ConcurrentRCSyncs int + ConcurrentRSSyncs int ConcurrentDSCSyncs int ConcurrentJobSyncs int ConcurrentResourceQuotaSyncs int @@ -99,6 +100,7 @@ func NewCMServer() *CMServer { Address: net.ParseIP("0.0.0.0"), ConcurrentEndpointSyncs: 5, ConcurrentRCSyncs: 5, + ConcurrentRSSyncs: 5, ConcurrentDSCSyncs: 2, ConcurrentJobSyncs: 5, ConcurrentResourceQuotaSyncs: 5, @@ -142,6 +144,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load") fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers") diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md index a81419f536d..3f92655dcc2 100644 --- a/docs/admin/kube-controller-manager.md +++ b/docs/admin/kube-controller-manager.md @@ -63,6 +63,7 @@ kube-controller-manager --cluster-name="kubernetes": The instance prefix for the cluster --concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load --concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load + --concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load --concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load --concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load --deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter. diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index e6d4c652b11..d3afecf3d7c 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -47,6 +47,7 @@ cluster-name cluster-tag concurrent-deployment-syncs concurrent-endpoint-syncs +concurrent-replicaset-syncs concurrent-resource-quota-syncs config-sync-period configure-cbr0 diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index c21b0d4aae9..f3db7709619 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -234,6 +234,12 @@ func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIV return estimate, err } } + if containsResource(resources, "replicasets") { + err = deleteReplicaSets(kubeClient.Extensions(), namespace) + if err != nil { + return estimate, err + } + } } return estimate, nil } @@ -538,6 +544,20 @@ func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns stri return nil } +func deleteReplicaSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error { + items, err := expClient.ReplicaSets(ns).List(api.ListOptions{}) + if err != nil { + return err + } + for i := range items.Items { + err := expClient.ReplicaSets(ns).Delete(items.Items[i].Name, nil) + if err != nil && !errors.IsNotFound(err) { + return err + } + } + return nil +} + // TODO: this is duplicated logic. Move it somewhere central? func containsVersion(versions *unversioned.APIVersions, version string) bool { for ix := range versions.Versions { diff --git a/pkg/controller/replicaset/options/options.go b/pkg/controller/replicaset/options/options.go new file mode 100644 index 00000000000..91951a54927 --- /dev/null +++ b/pkg/controller/replicaset/options/options.go @@ -0,0 +1,35 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "github.com/spf13/pflag" +) + +type ReplicasetControllerOptions struct { + ConcurrentRSSyncs int +} + +func NewReplicasetControllerOptions() ReplicasetControllerOptions { + return ReplicasetControllerOptions{ + ConcurrentRSSyncs: 5, + } +} + +func (o *ReplicasetControllerOptions) AddFlags(fs *pflag.FlagSet) { + fs.IntVar(&o.ConcurrentRSSyncs, "concurrent-replicaset-syncs", o.ConcurrentRSSyncs, "The number of replicasets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") +} diff --git a/pkg/kubectl/kubectl.go b/pkg/kubectl/kubectl.go index c7ce1bf6a0e..aa3db5ed082 100644 --- a/pkg/kubectl/kubectl.go +++ b/pkg/kubectl/kubectl.go @@ -118,6 +118,7 @@ func expandResourceShortcut(resource unversioned.GroupVersionResource) unversion "pv": api.SchemeGroupVersion.WithResource("persistentvolumes"), "quota": api.SchemeGroupVersion.WithResource("resourcequotas"), "rc": api.SchemeGroupVersion.WithResource("replicationcontrollers"), + "rs": extensions.SchemeGroupVersion.WithResource("replicasets"), "svc": api.SchemeGroupVersion.WithResource("services"), } if expanded, ok := shortForms[resource.Resource]; ok { diff --git a/pkg/kubectl/scale.go b/pkg/kubectl/scale.go index 80f07df9201..44214cecb8e 100644 --- a/pkg/kubectl/scale.go +++ b/pkg/kubectl/scale.go @@ -44,6 +44,8 @@ func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) { switch kind { case api.Kind("ReplicationController"): return &ReplicationControllerScaler{c}, nil + case extensions.Kind("ReplicaSet"): + return &ReplicaSetScaler{c.Extensions()}, nil case extensions.Kind("Job"): return &JobScaler{c.Extensions()}, nil case extensions.Kind("Deployment"): @@ -186,6 +188,68 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize return nil } +// ValidateReplicaSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise +func (precondition *ScalePrecondition) ValidateReplicaSet(replicaSet *extensions.ReplicaSet) error { + if precondition.Size != -1 && replicaSet.Spec.Replicas != precondition.Size { + return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(replicaSet.Spec.Replicas)} + } + if len(precondition.ResourceVersion) != 0 && replicaSet.ResourceVersion != precondition.ResourceVersion { + return PreconditionError{"resource version", precondition.ResourceVersion, replicaSet.ResourceVersion} + } + return nil +} + +type ReplicaSetScaler struct { + c client.ExtensionsInterface +} + +func (scaler *ReplicaSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error { + rs, err := scaler.c.ReplicaSets(namespace).Get(name) + if err != nil { + return ScaleError{ScaleGetFailure, "Unknown", err} + } + if preconditions != nil { + if err := preconditions.ValidateReplicaSet(rs); err != nil { + return err + } + } + rs.Spec.Replicas = int(newSize) + // TODO: do retry on 409 errors here? + if _, err := scaler.c.ReplicaSets(namespace).Update(rs); err != nil { + if errors.IsInvalid(err) { + return ScaleError{ScaleUpdateInvalidFailure, rs.ResourceVersion, err} + } + return ScaleError{ScaleUpdateFailure, rs.ResourceVersion, err} + } + return nil +} + +// Scale updates a ReplicaSet to a new size, with optional precondition check (if preconditions is +// not nil), optional retries (if retry is not nil), and then optionally waits for it's replica +// count to reach the new value (if wait is not nil). +func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error { + if preconditions == nil { + preconditions = &ScalePrecondition{-1, ""} + } + if retry == nil { + // Make it try only once, immediately + retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond} + } + cond := ScaleCondition(scaler, preconditions, namespace, name, newSize) + if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil { + return err + } + if waitForReplicas != nil { + rs, err := scaler.c.ReplicaSets(namespace).Get(name) + if err != nil { + return err + } + return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout, + client.ReplicaSetHasDesiredReplicas(scaler.c, rs)) + } + return nil +} + // ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise. func (precondition *ScalePrecondition) ValidateJob(job *extensions.Job) error { if precondition.Size != -1 && job.Spec.Parallelism == nil { diff --git a/pkg/kubectl/stop.go b/pkg/kubectl/stop.go index 9f4a406e20e..a430089b878 100644 --- a/pkg/kubectl/stop.go +++ b/pkg/kubectl/stop.go @@ -63,6 +63,9 @@ func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) { case api.Kind("ReplicationController"): return &ReplicationControllerReaper{c, Interval, Timeout}, nil + case extensions.Kind("ReplicaSet"): + return &ReplicaSetReaper{c, Interval, Timeout}, nil + case extensions.Kind("DaemonSet"): return &DaemonSetReaper{c, Interval, Timeout}, nil @@ -87,6 +90,10 @@ type ReplicationControllerReaper struct { client.Interface pollInterval, timeout time.Duration } +type ReplicaSetReaper struct { + client.Interface + pollInterval, timeout time.Duration +} type DaemonSetReaper struct { client.Interface pollInterval, timeout time.Duration @@ -190,6 +197,81 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout return nil } +// TODO(madhusudancs): Implement it when controllerRef is implemented - https://github.com/kubernetes/kubernetes/issues/2210 +// getOverlappingReplicaSets finds ReplicaSets that this ReplicaSet overlaps, as well as ReplicaSets overlapping this ReplicaSet. +func getOverlappingReplicaSets(c client.ReplicaSetInterface, rs *extensions.ReplicaSet) ([]extensions.ReplicaSet, []extensions.ReplicaSet, error) { + var overlappingRSs, exactMatchRSs []extensions.ReplicaSet + return overlappingRSs, exactMatchRSs, nil +} + +func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { + rsc := reaper.Extensions().ReplicaSets(namespace) + scaler, err := ScalerFor(extensions.Kind("ReplicaSet"), *reaper) + if err != nil { + return err + } + rs, err := rsc.Get(name) + if err != nil { + return err + } + if timeout == 0 { + timeout = Timeout + time.Duration(10*rs.Spec.Replicas)*time.Second + } + + // The ReplicaSet controller will try and detect all matching ReplicaSets + // for a pod's labels, and only sync the oldest one. This means if we have + // a pod with labels [(k1: v1), (k2: v2)] and two ReplicaSets: rs1 with + // selector [(k1=v1)], and rs2 with selector [(k1=v1),(k2=v2)], the + // ReplicaSet controller will sync the older of the two ReplicaSets. + // + // If there are ReplicaSets with a superset of labels, eg: + // deleting: (k1=v1), superset: (k2=v2, k1=v1) + // - It isn't safe to delete the ReplicaSet because there could be a pod + // with labels (k1=v1) that isn't managed by the superset ReplicaSet. + // We can't scale it down either, because there could be a pod + // (k2=v2, k1=v1) that it deletes causing a fight with the superset + // ReplicaSet. + // If there are ReplicaSets with a subset of labels, eg: + // deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3) + // - Even if it's safe to delete this ReplicaSet without a scale down because + // all it's pods are being controlled by the subset ReplicaSet the code + // returns an error. + + // In theory, creating overlapping ReplicaSets is user error, so the loop below + // tries to account for this logic only in the common case, where we end up + // with multiple ReplicaSets that have an exact match on selectors. + + // TODO(madhusudancs): Re-evaluate again when controllerRef is implemented - + // https://github.com/kubernetes/kubernetes/issues/2210 + overlappingRSs, exactMatchRSs, err := getOverlappingReplicaSets(rsc, rs) + if err != nil { + return fmt.Errorf("error getting ReplicaSets: %v", err) + } + + if len(overlappingRSs) > 0 { + var names []string + for _, overlappingRS := range overlappingRSs { + names = append(names, overlappingRS.Name) + } + return fmt.Errorf( + "Detected overlapping ReplicaSets for ReplicaSet %v: %v, please manage deletion individually with --cascade=false.", + rs.Name, strings.Join(names, ",")) + } + if len(exactMatchRSs) == 0 { + // No overlapping ReplicaSets. + retry := NewRetryParams(reaper.pollInterval, reaper.timeout) + waitForReplicas := NewRetryParams(reaper.pollInterval, timeout) + if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil { + return err + } + } + + if err := rsc.Delete(name, gracePeriod); err != nil { + return err + } + return nil +} + func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error { ds, err := reaper.Extensions().DaemonSets(namespace).Get(name) if err != nil { diff --git a/pkg/kubectl/stop_test.go b/pkg/kubectl/stop_test.go index 7fbb53b1ddc..5540815b347 100644 --- a/pkg/kubectl/stop_test.go +++ b/pkg/kubectl/stop_test.go @@ -264,6 +264,116 @@ func TestReplicationControllerStop(t *testing.T) { } } +func TestReplicaSetStop(t *testing.T) { + name := "foo" + ns := "default" + tests := []struct { + Name string + Objs []runtime.Object + StopError error + ExpectedActions []string + }{ + { + Name: "OnlyOneRS", + Objs: []runtime.Object{ + &extensions.ReplicaSet{ // GET + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + &extensions.ReplicaSetList{ // LIST + Items: []extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + }, + }, + }, + StopError: nil, + ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"}, + }, + { + Name: "NoOverlapping", + Objs: []runtime.Object{ + &extensions.ReplicaSet{ // GET + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + &extensions.ReplicaSetList{ // LIST + Items: []extensions.ReplicaSet{ + { + ObjectMeta: api.ObjectMeta{ + Name: "baz", + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k3": "v3"}}, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: name, + Namespace: ns, + }, + Spec: extensions.ReplicaSetSpec{ + Replicas: 0, + Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}}, + }, + }, + }, + }, + }, + StopError: nil, + ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"}, + }, + // TODO: Implement tests for overlapping replica sets, similar to replication controllers, + // when the overlapping checks are implemented for replica sets. + } + + for _, test := range tests { + fake := testclient.NewSimpleFake(test.Objs...) + reaper := ReplicaSetReaper{fake, time.Millisecond, time.Millisecond} + err := reaper.Stop(ns, name, 0, nil) + if !reflect.DeepEqual(err, test.StopError) { + t.Errorf("%s unexpected error: %v", test.Name, err) + continue + } + + actions := fake.Actions() + if len(actions) != len(test.ExpectedActions) { + t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions)) + continue + } + for i, verb := range test.ExpectedActions { + if actions[i].GetResource() != "replicasets" { + t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb) + } + if actions[i].GetVerb() != verb { + t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb) + } + } + } +} + func TestJobStop(t *testing.T) { name := "foo" ns := "default" diff --git a/pkg/master/master.go b/pkg/master/master.go index 70a8a3ac848..ed78f246e67 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -59,6 +59,7 @@ import ( podetcd "k8s.io/kubernetes/pkg/registry/pod/etcd" pspetcd "k8s.io/kubernetes/pkg/registry/podsecuritypolicy/etcd" podtemplateetcd "k8s.io/kubernetes/pkg/registry/podtemplate/etcd" + replicasetetcd "k8s.io/kubernetes/pkg/registry/replicaset/etcd" resourcequotaetcd "k8s.io/kubernetes/pkg/registry/resourcequota/etcd" secretetcd "k8s.io/kubernetes/pkg/registry/secret/etcd" "k8s.io/kubernetes/pkg/registry/service" @@ -640,6 +641,11 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage { podSecurityPolicyStorage := pspetcd.NewREST(dbClient("podsecuritypolicy"), storageDecorator) storage["podSecurityPolicies"] = podSecurityPolicyStorage } + if isEnabled("replicasets") { + replicaSetStorage := replicasetetcd.NewStorage(dbClient("replicasets"), storageDecorator) + storage["replicasets"] = replicaSetStorage.ReplicaSet + storage["replicasets/status"] = replicaSetStorage.Status + } return storage }