mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Link all the ReplicaSet controller boilerplate together.
1. Enable replica set in controller manager. 2. Enable replica set etcd storage in master package. 3. Add replica set support to kubectl commands.
This commit is contained in:
parent
fcf9c4a1e4
commit
4f9b8b2bfc
@ -53,6 +53,7 @@ import (
|
|||||||
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
|
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
|
||||||
"k8s.io/kubernetes/pkg/controller/podautoscaler"
|
"k8s.io/kubernetes/pkg/controller/podautoscaler"
|
||||||
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
"k8s.io/kubernetes/pkg/controller/podautoscaler/metrics"
|
||||||
|
replicaset "k8s.io/kubernetes/pkg/controller/replicaset"
|
||||||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||||
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
|
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
|
||||||
routecontroller "k8s.io/kubernetes/pkg/controller/route"
|
routecontroller "k8s.io/kubernetes/pkg/controller/route"
|
||||||
@ -278,6 +279,12 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||||||
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)).
|
go deployment.NewDeploymentController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "deployment-controller")), ResyncPeriod(s)).
|
||||||
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
|
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if containsResource(resources, "replicasets") {
|
||||||
|
glog.Infof("Starting ReplicaSet controller")
|
||||||
|
go replicaset.NewReplicaSetController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "replicaset-controller")), ResyncPeriod(s), replicaset.BurstReplicas).
|
||||||
|
Run(s.ConcurrentRSSyncs, util.NeverStop)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)
|
volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)
|
||||||
|
@ -39,6 +39,7 @@ type CMServer struct {
|
|||||||
CloudConfigFile string
|
CloudConfigFile string
|
||||||
ConcurrentEndpointSyncs int
|
ConcurrentEndpointSyncs int
|
||||||
ConcurrentRCSyncs int
|
ConcurrentRCSyncs int
|
||||||
|
ConcurrentRSSyncs int
|
||||||
ConcurrentDSCSyncs int
|
ConcurrentDSCSyncs int
|
||||||
ConcurrentJobSyncs int
|
ConcurrentJobSyncs int
|
||||||
ConcurrentResourceQuotaSyncs int
|
ConcurrentResourceQuotaSyncs int
|
||||||
@ -99,6 +100,7 @@ func NewCMServer() *CMServer {
|
|||||||
Address: net.ParseIP("0.0.0.0"),
|
Address: net.ParseIP("0.0.0.0"),
|
||||||
ConcurrentEndpointSyncs: 5,
|
ConcurrentEndpointSyncs: 5,
|
||||||
ConcurrentRCSyncs: 5,
|
ConcurrentRCSyncs: 5,
|
||||||
|
ConcurrentRSSyncs: 5,
|
||||||
ConcurrentDSCSyncs: 2,
|
ConcurrentDSCSyncs: 2,
|
||||||
ConcurrentJobSyncs: 5,
|
ConcurrentJobSyncs: 5,
|
||||||
ConcurrentResourceQuotaSyncs: 5,
|
ConcurrentResourceQuotaSyncs: 5,
|
||||||
@ -142,6 +144,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
|
||||||
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||||
fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
|
||||||
|
fs.IntVar(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
|
||||||
fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
|
||||||
fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load")
|
||||||
fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers")
|
fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers")
|
||||||
|
@ -63,6 +63,7 @@ kube-controller-manager
|
|||||||
--cluster-name="kubernetes": The instance prefix for the cluster
|
--cluster-name="kubernetes": The instance prefix for the cluster
|
||||||
--concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load
|
--concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load
|
||||||
--concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
|
--concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
|
||||||
|
--concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load
|
||||||
--concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load
|
--concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load
|
||||||
--concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load
|
--concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load
|
||||||
--deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.
|
--deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.
|
||||||
|
@ -47,6 +47,7 @@ cluster-name
|
|||||||
cluster-tag
|
cluster-tag
|
||||||
concurrent-deployment-syncs
|
concurrent-deployment-syncs
|
||||||
concurrent-endpoint-syncs
|
concurrent-endpoint-syncs
|
||||||
|
concurrent-replicaset-syncs
|
||||||
concurrent-resource-quota-syncs
|
concurrent-resource-quota-syncs
|
||||||
config-sync-period
|
config-sync-period
|
||||||
configure-cbr0
|
configure-cbr0
|
||||||
|
@ -234,6 +234,12 @@ func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIV
|
|||||||
return estimate, err
|
return estimate, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if containsResource(resources, "replicasets") {
|
||||||
|
err = deleteReplicaSets(kubeClient.Extensions(), namespace)
|
||||||
|
if err != nil {
|
||||||
|
return estimate, err
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return estimate, nil
|
return estimate, nil
|
||||||
}
|
}
|
||||||
@ -538,6 +544,20 @@ func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns stri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deleteReplicaSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error {
|
||||||
|
items, err := expClient.ReplicaSets(ns).List(api.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for i := range items.Items {
|
||||||
|
err := expClient.ReplicaSets(ns).Delete(items.Items[i].Name, nil)
|
||||||
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: this is duplicated logic. Move it somewhere central?
|
// TODO: this is duplicated logic. Move it somewhere central?
|
||||||
func containsVersion(versions *unversioned.APIVersions, version string) bool {
|
func containsVersion(versions *unversioned.APIVersions, version string) bool {
|
||||||
for ix := range versions.Versions {
|
for ix := range versions.Versions {
|
||||||
|
35
pkg/controller/replicaset/options/options.go
Normal file
35
pkg/controller/replicaset/options/options.go
Normal file
@ -0,0 +1,35 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package options
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/spf13/pflag"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ReplicasetControllerOptions struct {
|
||||||
|
ConcurrentRSSyncs int
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewReplicasetControllerOptions() ReplicasetControllerOptions {
|
||||||
|
return ReplicasetControllerOptions{
|
||||||
|
ConcurrentRSSyncs: 5,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *ReplicasetControllerOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
|
fs.IntVar(&o.ConcurrentRSSyncs, "concurrent-replicaset-syncs", o.ConcurrentRSSyncs, "The number of replicasets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
|
||||||
|
}
|
@ -118,6 +118,7 @@ func expandResourceShortcut(resource unversioned.GroupVersionResource) unversion
|
|||||||
"pv": api.SchemeGroupVersion.WithResource("persistentvolumes"),
|
"pv": api.SchemeGroupVersion.WithResource("persistentvolumes"),
|
||||||
"quota": api.SchemeGroupVersion.WithResource("resourcequotas"),
|
"quota": api.SchemeGroupVersion.WithResource("resourcequotas"),
|
||||||
"rc": api.SchemeGroupVersion.WithResource("replicationcontrollers"),
|
"rc": api.SchemeGroupVersion.WithResource("replicationcontrollers"),
|
||||||
|
"rs": extensions.SchemeGroupVersion.WithResource("replicasets"),
|
||||||
"svc": api.SchemeGroupVersion.WithResource("services"),
|
"svc": api.SchemeGroupVersion.WithResource("services"),
|
||||||
}
|
}
|
||||||
if expanded, ok := shortForms[resource.Resource]; ok {
|
if expanded, ok := shortForms[resource.Resource]; ok {
|
||||||
|
@ -44,6 +44,8 @@ func ScalerFor(kind unversioned.GroupKind, c client.Interface) (Scaler, error) {
|
|||||||
switch kind {
|
switch kind {
|
||||||
case api.Kind("ReplicationController"):
|
case api.Kind("ReplicationController"):
|
||||||
return &ReplicationControllerScaler{c}, nil
|
return &ReplicationControllerScaler{c}, nil
|
||||||
|
case extensions.Kind("ReplicaSet"):
|
||||||
|
return &ReplicaSetScaler{c.Extensions()}, nil
|
||||||
case extensions.Kind("Job"):
|
case extensions.Kind("Job"):
|
||||||
return &JobScaler{c.Extensions()}, nil
|
return &JobScaler{c.Extensions()}, nil
|
||||||
case extensions.Kind("Deployment"):
|
case extensions.Kind("Deployment"):
|
||||||
@ -186,6 +188,68 @@ func (scaler *ReplicationControllerScaler) Scale(namespace, name string, newSize
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidateReplicaSet ensures that the preconditions match. Returns nil if they are valid, an error otherwise
|
||||||
|
func (precondition *ScalePrecondition) ValidateReplicaSet(replicaSet *extensions.ReplicaSet) error {
|
||||||
|
if precondition.Size != -1 && replicaSet.Spec.Replicas != precondition.Size {
|
||||||
|
return PreconditionError{"replicas", strconv.Itoa(precondition.Size), strconv.Itoa(replicaSet.Spec.Replicas)}
|
||||||
|
}
|
||||||
|
if len(precondition.ResourceVersion) != 0 && replicaSet.ResourceVersion != precondition.ResourceVersion {
|
||||||
|
return PreconditionError{"resource version", precondition.ResourceVersion, replicaSet.ResourceVersion}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReplicaSetScaler struct {
|
||||||
|
c client.ExtensionsInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func (scaler *ReplicaSetScaler) ScaleSimple(namespace, name string, preconditions *ScalePrecondition, newSize uint) error {
|
||||||
|
rs, err := scaler.c.ReplicaSets(namespace).Get(name)
|
||||||
|
if err != nil {
|
||||||
|
return ScaleError{ScaleGetFailure, "Unknown", err}
|
||||||
|
}
|
||||||
|
if preconditions != nil {
|
||||||
|
if err := preconditions.ValidateReplicaSet(rs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rs.Spec.Replicas = int(newSize)
|
||||||
|
// TODO: do retry on 409 errors here?
|
||||||
|
if _, err := scaler.c.ReplicaSets(namespace).Update(rs); err != nil {
|
||||||
|
if errors.IsInvalid(err) {
|
||||||
|
return ScaleError{ScaleUpdateInvalidFailure, rs.ResourceVersion, err}
|
||||||
|
}
|
||||||
|
return ScaleError{ScaleUpdateFailure, rs.ResourceVersion, err}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scale updates a ReplicaSet to a new size, with optional precondition check (if preconditions is
|
||||||
|
// not nil), optional retries (if retry is not nil), and then optionally waits for it's replica
|
||||||
|
// count to reach the new value (if wait is not nil).
|
||||||
|
func (scaler *ReplicaSetScaler) Scale(namespace, name string, newSize uint, preconditions *ScalePrecondition, retry, waitForReplicas *RetryParams) error {
|
||||||
|
if preconditions == nil {
|
||||||
|
preconditions = &ScalePrecondition{-1, ""}
|
||||||
|
}
|
||||||
|
if retry == nil {
|
||||||
|
// Make it try only once, immediately
|
||||||
|
retry = &RetryParams{Interval: time.Millisecond, Timeout: time.Millisecond}
|
||||||
|
}
|
||||||
|
cond := ScaleCondition(scaler, preconditions, namespace, name, newSize)
|
||||||
|
if err := wait.Poll(retry.Interval, retry.Timeout, cond); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if waitForReplicas != nil {
|
||||||
|
rs, err := scaler.c.ReplicaSets(namespace).Get(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return wait.Poll(waitForReplicas.Interval, waitForReplicas.Timeout,
|
||||||
|
client.ReplicaSetHasDesiredReplicas(scaler.c, rs))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
|
// ValidateJob ensures that the preconditions match. Returns nil if they are valid, an error otherwise.
|
||||||
func (precondition *ScalePrecondition) ValidateJob(job *extensions.Job) error {
|
func (precondition *ScalePrecondition) ValidateJob(job *extensions.Job) error {
|
||||||
if precondition.Size != -1 && job.Spec.Parallelism == nil {
|
if precondition.Size != -1 && job.Spec.Parallelism == nil {
|
||||||
|
@ -63,6 +63,9 @@ func ReaperFor(kind unversioned.GroupKind, c client.Interface) (Reaper, error) {
|
|||||||
case api.Kind("ReplicationController"):
|
case api.Kind("ReplicationController"):
|
||||||
return &ReplicationControllerReaper{c, Interval, Timeout}, nil
|
return &ReplicationControllerReaper{c, Interval, Timeout}, nil
|
||||||
|
|
||||||
|
case extensions.Kind("ReplicaSet"):
|
||||||
|
return &ReplicaSetReaper{c, Interval, Timeout}, nil
|
||||||
|
|
||||||
case extensions.Kind("DaemonSet"):
|
case extensions.Kind("DaemonSet"):
|
||||||
return &DaemonSetReaper{c, Interval, Timeout}, nil
|
return &DaemonSetReaper{c, Interval, Timeout}, nil
|
||||||
|
|
||||||
@ -87,6 +90,10 @@ type ReplicationControllerReaper struct {
|
|||||||
client.Interface
|
client.Interface
|
||||||
pollInterval, timeout time.Duration
|
pollInterval, timeout time.Duration
|
||||||
}
|
}
|
||||||
|
type ReplicaSetReaper struct {
|
||||||
|
client.Interface
|
||||||
|
pollInterval, timeout time.Duration
|
||||||
|
}
|
||||||
type DaemonSetReaper struct {
|
type DaemonSetReaper struct {
|
||||||
client.Interface
|
client.Interface
|
||||||
pollInterval, timeout time.Duration
|
pollInterval, timeout time.Duration
|
||||||
@ -190,6 +197,81 @@ func (reaper *ReplicationControllerReaper) Stop(namespace, name string, timeout
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(madhusudancs): Implement it when controllerRef is implemented - https://github.com/kubernetes/kubernetes/issues/2210
|
||||||
|
// getOverlappingReplicaSets finds ReplicaSets that this ReplicaSet overlaps, as well as ReplicaSets overlapping this ReplicaSet.
|
||||||
|
func getOverlappingReplicaSets(c client.ReplicaSetInterface, rs *extensions.ReplicaSet) ([]extensions.ReplicaSet, []extensions.ReplicaSet, error) {
|
||||||
|
var overlappingRSs, exactMatchRSs []extensions.ReplicaSet
|
||||||
|
return overlappingRSs, exactMatchRSs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (reaper *ReplicaSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
|
||||||
|
rsc := reaper.Extensions().ReplicaSets(namespace)
|
||||||
|
scaler, err := ScalerFor(extensions.Kind("ReplicaSet"), *reaper)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
rs, err := rsc.Get(name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if timeout == 0 {
|
||||||
|
timeout = Timeout + time.Duration(10*rs.Spec.Replicas)*time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
// The ReplicaSet controller will try and detect all matching ReplicaSets
|
||||||
|
// for a pod's labels, and only sync the oldest one. This means if we have
|
||||||
|
// a pod with labels [(k1: v1), (k2: v2)] and two ReplicaSets: rs1 with
|
||||||
|
// selector [(k1=v1)], and rs2 with selector [(k1=v1),(k2=v2)], the
|
||||||
|
// ReplicaSet controller will sync the older of the two ReplicaSets.
|
||||||
|
//
|
||||||
|
// If there are ReplicaSets with a superset of labels, eg:
|
||||||
|
// deleting: (k1=v1), superset: (k2=v2, k1=v1)
|
||||||
|
// - It isn't safe to delete the ReplicaSet because there could be a pod
|
||||||
|
// with labels (k1=v1) that isn't managed by the superset ReplicaSet.
|
||||||
|
// We can't scale it down either, because there could be a pod
|
||||||
|
// (k2=v2, k1=v1) that it deletes causing a fight with the superset
|
||||||
|
// ReplicaSet.
|
||||||
|
// If there are ReplicaSets with a subset of labels, eg:
|
||||||
|
// deleting: (k2=v2, k1=v1), subset: (k1=v1), superset: (k2=v2, k1=v1, k3=v3)
|
||||||
|
// - Even if it's safe to delete this ReplicaSet without a scale down because
|
||||||
|
// all it's pods are being controlled by the subset ReplicaSet the code
|
||||||
|
// returns an error.
|
||||||
|
|
||||||
|
// In theory, creating overlapping ReplicaSets is user error, so the loop below
|
||||||
|
// tries to account for this logic only in the common case, where we end up
|
||||||
|
// with multiple ReplicaSets that have an exact match on selectors.
|
||||||
|
|
||||||
|
// TODO(madhusudancs): Re-evaluate again when controllerRef is implemented -
|
||||||
|
// https://github.com/kubernetes/kubernetes/issues/2210
|
||||||
|
overlappingRSs, exactMatchRSs, err := getOverlappingReplicaSets(rsc, rs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error getting ReplicaSets: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(overlappingRSs) > 0 {
|
||||||
|
var names []string
|
||||||
|
for _, overlappingRS := range overlappingRSs {
|
||||||
|
names = append(names, overlappingRS.Name)
|
||||||
|
}
|
||||||
|
return fmt.Errorf(
|
||||||
|
"Detected overlapping ReplicaSets for ReplicaSet %v: %v, please manage deletion individually with --cascade=false.",
|
||||||
|
rs.Name, strings.Join(names, ","))
|
||||||
|
}
|
||||||
|
if len(exactMatchRSs) == 0 {
|
||||||
|
// No overlapping ReplicaSets.
|
||||||
|
retry := NewRetryParams(reaper.pollInterval, reaper.timeout)
|
||||||
|
waitForReplicas := NewRetryParams(reaper.pollInterval, timeout)
|
||||||
|
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForReplicas); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rsc.Delete(name, gracePeriod); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
|
func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) error {
|
||||||
ds, err := reaper.Extensions().DaemonSets(namespace).Get(name)
|
ds, err := reaper.Extensions().DaemonSets(namespace).Get(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -264,6 +264,116 @@ func TestReplicationControllerStop(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReplicaSetStop(t *testing.T) {
|
||||||
|
name := "foo"
|
||||||
|
ns := "default"
|
||||||
|
tests := []struct {
|
||||||
|
Name string
|
||||||
|
Objs []runtime.Object
|
||||||
|
StopError error
|
||||||
|
ExpectedActions []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
Name: "OnlyOneRS",
|
||||||
|
Objs: []runtime.Object{
|
||||||
|
&extensions.ReplicaSet{ // GET
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: ns,
|
||||||
|
},
|
||||||
|
Spec: extensions.ReplicaSetSpec{
|
||||||
|
Replicas: 0,
|
||||||
|
Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&extensions.ReplicaSetList{ // LIST
|
||||||
|
Items: []extensions.ReplicaSet{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: ns,
|
||||||
|
},
|
||||||
|
Spec: extensions.ReplicaSetSpec{
|
||||||
|
Replicas: 0,
|
||||||
|
Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
StopError: nil,
|
||||||
|
ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "NoOverlapping",
|
||||||
|
Objs: []runtime.Object{
|
||||||
|
&extensions.ReplicaSet{ // GET
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: ns,
|
||||||
|
},
|
||||||
|
Spec: extensions.ReplicaSetSpec{
|
||||||
|
Replicas: 0,
|
||||||
|
Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&extensions.ReplicaSetList{ // LIST
|
||||||
|
Items: []extensions.ReplicaSet{
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "baz",
|
||||||
|
Namespace: ns,
|
||||||
|
},
|
||||||
|
Spec: extensions.ReplicaSetSpec{
|
||||||
|
Replicas: 0,
|
||||||
|
Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k3": "v3"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: ns,
|
||||||
|
},
|
||||||
|
Spec: extensions.ReplicaSetSpec{
|
||||||
|
Replicas: 0,
|
||||||
|
Selector: &unversioned.LabelSelector{MatchLabels: map[string]string{"k1": "v1"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
StopError: nil,
|
||||||
|
ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"},
|
||||||
|
},
|
||||||
|
// TODO: Implement tests for overlapping replica sets, similar to replication controllers,
|
||||||
|
// when the overlapping checks are implemented for replica sets.
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
fake := testclient.NewSimpleFake(test.Objs...)
|
||||||
|
reaper := ReplicaSetReaper{fake, time.Millisecond, time.Millisecond}
|
||||||
|
err := reaper.Stop(ns, name, 0, nil)
|
||||||
|
if !reflect.DeepEqual(err, test.StopError) {
|
||||||
|
t.Errorf("%s unexpected error: %v", test.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
actions := fake.Actions()
|
||||||
|
if len(actions) != len(test.ExpectedActions) {
|
||||||
|
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for i, verb := range test.ExpectedActions {
|
||||||
|
if actions[i].GetResource() != "replicasets" {
|
||||||
|
t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb)
|
||||||
|
}
|
||||||
|
if actions[i].GetVerb() != verb {
|
||||||
|
t.Errorf("%s unexpected action: %+v, expected %s-replicaSet", test.Name, actions[i], verb)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestJobStop(t *testing.T) {
|
func TestJobStop(t *testing.T) {
|
||||||
name := "foo"
|
name := "foo"
|
||||||
ns := "default"
|
ns := "default"
|
||||||
|
@ -59,6 +59,7 @@ import (
|
|||||||
podetcd "k8s.io/kubernetes/pkg/registry/pod/etcd"
|
podetcd "k8s.io/kubernetes/pkg/registry/pod/etcd"
|
||||||
pspetcd "k8s.io/kubernetes/pkg/registry/podsecuritypolicy/etcd"
|
pspetcd "k8s.io/kubernetes/pkg/registry/podsecuritypolicy/etcd"
|
||||||
podtemplateetcd "k8s.io/kubernetes/pkg/registry/podtemplate/etcd"
|
podtemplateetcd "k8s.io/kubernetes/pkg/registry/podtemplate/etcd"
|
||||||
|
replicasetetcd "k8s.io/kubernetes/pkg/registry/replicaset/etcd"
|
||||||
resourcequotaetcd "k8s.io/kubernetes/pkg/registry/resourcequota/etcd"
|
resourcequotaetcd "k8s.io/kubernetes/pkg/registry/resourcequota/etcd"
|
||||||
secretetcd "k8s.io/kubernetes/pkg/registry/secret/etcd"
|
secretetcd "k8s.io/kubernetes/pkg/registry/secret/etcd"
|
||||||
"k8s.io/kubernetes/pkg/registry/service"
|
"k8s.io/kubernetes/pkg/registry/service"
|
||||||
@ -640,6 +641,11 @@ func (m *Master) getExtensionResources(c *Config) map[string]rest.Storage {
|
|||||||
podSecurityPolicyStorage := pspetcd.NewREST(dbClient("podsecuritypolicy"), storageDecorator)
|
podSecurityPolicyStorage := pspetcd.NewREST(dbClient("podsecuritypolicy"), storageDecorator)
|
||||||
storage["podSecurityPolicies"] = podSecurityPolicyStorage
|
storage["podSecurityPolicies"] = podSecurityPolicyStorage
|
||||||
}
|
}
|
||||||
|
if isEnabled("replicasets") {
|
||||||
|
replicaSetStorage := replicasetetcd.NewStorage(dbClient("replicasets"), storageDecorator)
|
||||||
|
storage["replicasets"] = replicaSetStorage.ReplicaSet
|
||||||
|
storage["replicasets/status"] = replicaSetStorage.Status
|
||||||
|
}
|
||||||
|
|
||||||
return storage
|
return storage
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user