mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 18:54:06 +00:00
Merge pull request #17398 from janetkuo/deployment-controller-informer
Auto commit by PR queue bot
This commit is contained in:
commit
9e8233fc3c
@ -74,6 +74,7 @@ type CMServer struct {
|
|||||||
ConcurrentDSCSyncs int
|
ConcurrentDSCSyncs int
|
||||||
ConcurrentJobSyncs int
|
ConcurrentJobSyncs int
|
||||||
ConcurrentResourceQuotaSyncs int
|
ConcurrentResourceQuotaSyncs int
|
||||||
|
ConcurrentDeploymentSyncs int
|
||||||
ServiceSyncPeriod time.Duration
|
ServiceSyncPeriod time.Duration
|
||||||
NodeSyncPeriod time.Duration
|
NodeSyncPeriod time.Duration
|
||||||
ResourceQuotaSyncPeriod time.Duration
|
ResourceQuotaSyncPeriod time.Duration
|
||||||
@ -116,6 +117,7 @@ func NewCMServer() *CMServer {
|
|||||||
ConcurrentDSCSyncs: 2,
|
ConcurrentDSCSyncs: 2,
|
||||||
ConcurrentJobSyncs: 5,
|
ConcurrentJobSyncs: 5,
|
||||||
ConcurrentResourceQuotaSyncs: 5,
|
ConcurrentResourceQuotaSyncs: 5,
|
||||||
|
ConcurrentDeploymentSyncs: 5,
|
||||||
ServiceSyncPeriod: 5 * time.Minute,
|
ServiceSyncPeriod: 5 * time.Minute,
|
||||||
NodeSyncPeriod: 10 * time.Second,
|
NodeSyncPeriod: 10 * time.Second,
|
||||||
ResourceQuotaSyncPeriod: 5 * time.Minute,
|
ResourceQuotaSyncPeriod: 5 * time.Minute,
|
||||||
@ -189,6 +191,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
|
||||||
fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load")
|
||||||
fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
|
fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load")
|
||||||
|
fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load")
|
||||||
fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers")
|
fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers")
|
||||||
fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+
|
fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+
|
||||||
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
|
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
|
||||||
@ -383,8 +386,8 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
|
|
||||||
if containsResource(resources, "deployments") {
|
if containsResource(resources, "deployments") {
|
||||||
glog.Infof("Starting deployment controller")
|
glog.Infof("Starting deployment controller")
|
||||||
deployment.New(clientForUserAgentOrDie(*kubeconfig, "deployment-controller")).
|
go deployment.NewDeploymentController(clientForUserAgentOrDie(*kubeconfig, "deployment-controller"), s.ResyncPeriod).
|
||||||
Run(s.DeploymentControllerSyncPeriod)
|
Run(s.ConcurrentDeploymentSyncs, util.NeverStop)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,6 +60,7 @@ kube-controller-manager
|
|||||||
--cloud-provider="": The provider for cloud services. Empty string for no provider.
|
--cloud-provider="": The provider for cloud services. Empty string for no provider.
|
||||||
--cluster-cidr=<nil>: CIDR Range for Pods in cluster.
|
--cluster-cidr=<nil>: CIDR Range for Pods in cluster.
|
||||||
--cluster-name="kubernetes": The instance prefix for the cluster
|
--cluster-name="kubernetes": The instance prefix for the cluster
|
||||||
|
--concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load
|
||||||
--concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
|
--concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load
|
||||||
--concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load
|
--concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load
|
||||||
--concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load
|
--concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load
|
||||||
|
@ -640,7 +640,7 @@ runTests() {
|
|||||||
# Post-Condition: hpa "frontend" has configuration annotation
|
# Post-Condition: hpa "frontend" has configuration annotation
|
||||||
[[ "$(kubectl get hpa frontend -o yaml "${kube_flags[@]}" | grep kubectl.kubernetes.io/last-applied-configuration)" ]]
|
[[ "$(kubectl get hpa frontend -o yaml "${kube_flags[@]}" | grep kubectl.kubernetes.io/last-applied-configuration)" ]]
|
||||||
# Clean up
|
# Clean up
|
||||||
kubectl delete rc,hpa frontend
|
kubectl delete rc,hpa frontend "${kube_flags[@]}"
|
||||||
|
|
||||||
## kubectl apply should create the resource that doesn't exist yet
|
## kubectl apply should create the resource that doesn't exist yet
|
||||||
# Pre-Condition: no POD is running
|
# Pre-Condition: no POD is running
|
||||||
@ -658,19 +658,20 @@ runTests() {
|
|||||||
# Pre-Condition: no Job is running
|
# Pre-Condition: no Job is running
|
||||||
kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" ''
|
kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
# Command
|
# Command
|
||||||
kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)'
|
kubectl run pi --image=perl --restart=OnFailure -- perl -Mbignum=bpi -wle 'print bpi(20)' "${kube_flags[@]}"
|
||||||
# Post-Condition: Job "pi" is created
|
# Post-Condition: Job "pi" is created
|
||||||
kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" 'pi:'
|
kube::test::get_object_assert jobs "{{range.items}}{{$id_field}}:{{end}}" 'pi:'
|
||||||
# Clean up
|
# Clean up
|
||||||
kubectl delete jobs pi
|
kubectl delete jobs pi "${kube_flags[@]}"
|
||||||
# Pre-Condition: no Deployment is running
|
# Pre-Condition: no Deployment is running
|
||||||
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" ''
|
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" ''
|
||||||
# Command
|
# Command
|
||||||
kubectl run nginx --image=nginx --generator=deployment/v1beta1
|
kubectl run nginx --image=nginx --generator=deployment/v1beta1 "${kube_flags[@]}"
|
||||||
# Post-Condition: Deployment "nginx" is created
|
# Post-Condition: Deployment "nginx" is created
|
||||||
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx:'
|
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx:'
|
||||||
# Clean up
|
# Clean up
|
||||||
kubectl delete deployment nginx
|
kubectl delete deployment nginx "${kube_flags[@]}"
|
||||||
|
kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}"
|
||||||
|
|
||||||
##############
|
##############
|
||||||
# Namespaces #
|
# Namespaces #
|
||||||
@ -1050,11 +1051,12 @@ __EOF__
|
|||||||
kubectl create -f examples/extensions/deployment.yaml "${kube_flags[@]}"
|
kubectl create -f examples/extensions/deployment.yaml "${kube_flags[@]}"
|
||||||
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx-deployment:'
|
kube::test::get_object_assert deployment "{{range.items}}{{$id_field}}:{{end}}" 'nginx-deployment:'
|
||||||
# autoscale 2~3 pods, default CPU utilization (80%)
|
# autoscale 2~3 pods, default CPU utilization (80%)
|
||||||
kubectl autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3
|
kubectl-with-retry autoscale deployment nginx-deployment "${kube_flags[@]}" --min=2 --max=3
|
||||||
kube::test::get_object_assert 'hpa nginx-deployment' "{{$hpa_min_field}} {{$hpa_max_field}} {{$hpa_cpu_field}}" '2 3 80'
|
kube::test::get_object_assert 'hpa nginx-deployment' "{{$hpa_min_field}} {{$hpa_max_field}} {{$hpa_cpu_field}}" '2 3 80'
|
||||||
# Clean up
|
# Clean up
|
||||||
kubectl delete hpa nginx-deployment "${kube_flags[@]}"
|
kubectl delete hpa nginx-deployment "${kube_flags[@]}"
|
||||||
kubectl delete deployment nginx-deployment "${kube_flags[@]}"
|
kubectl delete deployment nginx-deployment "${kube_flags[@]}"
|
||||||
|
kubectl delete rc -l deployment.kubernetes.io/podTemplateHash "${kube_flags[@]}"
|
||||||
|
|
||||||
######################
|
######################
|
||||||
# Multiple Resources #
|
# Multiple Resources #
|
||||||
|
@ -45,6 +45,7 @@ cluster-dns
|
|||||||
cluster-domain
|
cluster-domain
|
||||||
cluster-name
|
cluster-name
|
||||||
cluster-tag
|
cluster-tag
|
||||||
|
concurrent-deployment-syncs
|
||||||
concurrent-endpoint-syncs
|
concurrent-endpoint-syncs
|
||||||
concurrent-resource-quota-syncs
|
concurrent-resource-quota-syncs
|
||||||
config-sync-period
|
config-sync-period
|
||||||
|
70
pkg/client/cache/listers.go
vendored
70
pkg/client/cache/listers.go
vendored
@ -167,7 +167,7 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
|
|||||||
var rc api.ReplicationController
|
var rc api.ReplicationController
|
||||||
|
|
||||||
if len(pod.Labels) == 0 {
|
if len(pod.Labels) == 0 {
|
||||||
err = fmt.Errorf("No controllers found for pod %v because it has no labels", pod.Name)
|
err = fmt.Errorf("no controllers found for pod %v because it has no labels", pod.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -186,7 +186,61 @@ func (s *StoreToReplicationControllerLister) GetPodControllers(pod *api.Pod) (co
|
|||||||
controllers = append(controllers, rc)
|
controllers = append(controllers, rc)
|
||||||
}
|
}
|
||||||
if len(controllers) == 0 {
|
if len(controllers) == 0 {
|
||||||
err = fmt.Errorf("Could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
err = fmt.Errorf("could not find controller for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreToDeploymentLister gives a store List and Exists methods. The store must contain only Deployments.
|
||||||
|
type StoreToDeploymentLister struct {
|
||||||
|
Store
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exists checks if the given deployment exists in the store.
|
||||||
|
func (s *StoreToDeploymentLister) Exists(deployment *extensions.Deployment) (bool, error) {
|
||||||
|
_, exists, err := s.Store.Get(deployment)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return exists, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StoreToDeploymentLister lists all deployments in the store.
|
||||||
|
// TODO: converge on the interface in pkg/client
|
||||||
|
func (s *StoreToDeploymentLister) List() (deployments []extensions.Deployment, err error) {
|
||||||
|
for _, c := range s.Store.List() {
|
||||||
|
deployments = append(deployments, *(c.(*extensions.Deployment)))
|
||||||
|
}
|
||||||
|
return deployments, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetDeploymentsForRC returns a list of deployments managing a replication controller. Returns an error only if no matching deployments are found.
|
||||||
|
func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationController) (deployments []extensions.Deployment, err error) {
|
||||||
|
var selector labels.Selector
|
||||||
|
var d extensions.Deployment
|
||||||
|
|
||||||
|
if len(rc.Labels) == 0 {
|
||||||
|
err = fmt.Errorf("no deployments found for replication controller %v because it has no labels", rc.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: MODIFY THIS METHOD so that it checks for the podTemplateSpecHash label
|
||||||
|
for _, m := range s.Store.List() {
|
||||||
|
d = *m.(*extensions.Deployment)
|
||||||
|
if d.Namespace != rc.Namespace {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
labelSet := labels.Set(d.Spec.Selector)
|
||||||
|
selector = labels.Set(d.Spec.Selector).AsSelector()
|
||||||
|
|
||||||
|
// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
|
||||||
|
if labelSet.AsSelector().Empty() || !selector.Matches(labels.Set(rc.Labels)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
deployments = append(deployments, d)
|
||||||
|
}
|
||||||
|
if len(deployments) == 0 {
|
||||||
|
err = fmt.Errorf("could not find deployments set for replication controller %s in namespace %s with labels: %v", rc.Name, rc.Namespace, rc.Labels)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -221,7 +275,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex
|
|||||||
var daemonSet extensions.DaemonSet
|
var daemonSet extensions.DaemonSet
|
||||||
|
|
||||||
if len(pod.Labels) == 0 {
|
if len(pod.Labels) == 0 {
|
||||||
err = fmt.Errorf("No daemon sets found for pod %v because it has no labels", pod.Name)
|
err = fmt.Errorf("no daemon sets found for pod %v because it has no labels", pod.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -243,7 +297,7 @@ func (s *StoreToDaemonSetLister) GetPodDaemonSets(pod *api.Pod) (daemonSets []ex
|
|||||||
daemonSets = append(daemonSets, daemonSet)
|
daemonSets = append(daemonSets, daemonSet)
|
||||||
}
|
}
|
||||||
if len(daemonSets) == 0 {
|
if len(daemonSets) == 0 {
|
||||||
err = fmt.Errorf("Could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
err = fmt.Errorf("could not find daemon set for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -283,7 +337,7 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Serv
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(services) == 0 {
|
if len(services) == 0 {
|
||||||
err = fmt.Errorf("Could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
err = fmt.Errorf("could not find service for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
@ -310,7 +364,7 @@ func (s *StoreToEndpointsLister) GetServiceEndpoints(svc *api.Service) (ep api.E
|
|||||||
return ep, nil
|
return ep, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = fmt.Errorf("Could not find endpoints for service: %v", svc.Name)
|
err = fmt.Errorf("could not find endpoints for service: %v", svc.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,7 +396,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err
|
|||||||
var job extensions.Job
|
var job extensions.Job
|
||||||
|
|
||||||
if len(pod.Labels) == 0 {
|
if len(pod.Labels) == 0 {
|
||||||
err = fmt.Errorf("No jobs found for pod %v because it has no labels", pod.Name)
|
err = fmt.Errorf("no jobs found for pod %v because it has no labels", pod.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -359,7 +413,7 @@ func (s *StoreToJobLister) GetPodJobs(pod *api.Pod) (jobs []extensions.Job, err
|
|||||||
jobs = append(jobs, job)
|
jobs = append(jobs, job)
|
||||||
}
|
}
|
||||||
if len(jobs) == 0 {
|
if len(jobs) == 0 {
|
||||||
err = fmt.Errorf("Could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
err = fmt.Errorf("could not find jobs for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -234,7 +234,7 @@ type PodControlInterface interface {
|
|||||||
DeletePod(namespace string, podID string) error
|
DeletePod(namespace string, podID string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RealPodControl is the default implementation of PodControllerInterface.
|
// RealPodControl is the default implementation of PodControlInterface.
|
||||||
type RealPodControl struct {
|
type RealPodControl struct {
|
||||||
KubeClient client.Interface
|
KubeClient client.Interface
|
||||||
Recorder record.EventRecorder
|
Recorder record.EventRecorder
|
||||||
|
@ -22,77 +22,380 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
|
"k8s.io/kubernetes/pkg/client/cache"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/framework"
|
||||||
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
|
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
|
||||||
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// FullDeploymentResyncPeriod means we'll attempt to recompute the required replicas
|
||||||
|
// of all deployments that have fulfilled their expectations at least this often.
|
||||||
|
// This recomputation happens based on contents in the local caches.
|
||||||
|
FullDeploymentResyncPeriod = 30 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// DeploymentController is responsible for synchronizing Deployment objects stored
|
||||||
|
// in the system with actual running rcs and pods.
|
||||||
type DeploymentController struct {
|
type DeploymentController struct {
|
||||||
client client.Interface
|
client client.Interface
|
||||||
expClient client.ExtensionsInterface
|
expClient client.ExtensionsInterface
|
||||||
eventRecorder record.EventRecorder
|
eventRecorder record.EventRecorder
|
||||||
|
|
||||||
|
// To allow injection of syncDeployment for testing.
|
||||||
|
syncHandler func(dKey string) error
|
||||||
|
|
||||||
|
// A store of deployments, populated by the dController
|
||||||
|
dStore cache.StoreToDeploymentLister
|
||||||
|
// Watches changes to all deployments
|
||||||
|
dController *framework.Controller
|
||||||
|
// A store of replication controllers, populated by the rcController
|
||||||
|
rcStore cache.StoreToReplicationControllerLister
|
||||||
|
// Watches changes to all replication controllers
|
||||||
|
rcController *framework.Controller
|
||||||
|
// rcStoreSynced returns true if the RC store has been synced at least once.
|
||||||
|
// Added as a member to the struct to allow injection for testing.
|
||||||
|
rcStoreSynced func() bool
|
||||||
|
// A store of pods, populated by the podController
|
||||||
|
podStore cache.StoreToPodLister
|
||||||
|
// Watches changes to all pods
|
||||||
|
podController *framework.Controller
|
||||||
|
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||||
|
// Added as a member to the struct to allow injection for testing.
|
||||||
|
podStoreSynced func() bool
|
||||||
|
|
||||||
|
// A TTLCache of pod creates/deletes each deployment expects to see
|
||||||
|
expectations controller.ControllerExpectationsInterface
|
||||||
|
|
||||||
|
// Deployments that need to be synced
|
||||||
|
queue *workqueue.Type
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(client client.Interface) *DeploymentController {
|
// NewDeploymentController creates a new DeploymentController.
|
||||||
|
func NewDeploymentController(client client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DeploymentController {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
eventBroadcaster.StartLogging(glog.Infof)
|
eventBroadcaster.StartLogging(glog.Infof)
|
||||||
eventBroadcaster.StartRecordingToSink(client.Events(""))
|
eventBroadcaster.StartRecordingToSink(client.Events(""))
|
||||||
|
|
||||||
return &DeploymentController{
|
dc := &DeploymentController{
|
||||||
client: client,
|
client: client,
|
||||||
expClient: client.Extensions(),
|
expClient: client.Extensions(),
|
||||||
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
|
eventRecorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "deployment-controller"}),
|
||||||
|
queue: workqueue.New(),
|
||||||
|
expectations: controller.NewControllerExpectations(),
|
||||||
|
}
|
||||||
|
|
||||||
|
dc.dStore.Store, dc.dController = framework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return dc.expClient.Deployments(api.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return dc.expClient.Deployments(api.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&extensions.Deployment{},
|
||||||
|
FullDeploymentResyncPeriod,
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: dc.enqueueDeployment,
|
||||||
|
UpdateFunc: func(old, cur interface{}) {
|
||||||
|
// Resync on deployment object relist.
|
||||||
|
dc.enqueueDeployment(cur)
|
||||||
|
},
|
||||||
|
// This will enter the sync loop and no-op, because the deployment has been deleted from the store.
|
||||||
|
DeleteFunc: dc.enqueueDeployment,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
dc.rcStore.Store, dc.rcController = framework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return dc.client.ReplicationControllers(api.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return dc.client.ReplicationControllers(api.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&api.ReplicationController{},
|
||||||
|
resyncPeriod(),
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: dc.addRC,
|
||||||
|
UpdateFunc: dc.updateRC,
|
||||||
|
DeleteFunc: dc.deleteRC,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
dc.podStore.Store, dc.podController = framework.NewInformer(
|
||||||
|
&cache.ListWatch{
|
||||||
|
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||||
|
return dc.client.Pods(api.NamespaceAll).List(options)
|
||||||
|
},
|
||||||
|
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||||
|
return dc.client.Pods(api.NamespaceAll).Watch(options)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&api.Pod{},
|
||||||
|
resyncPeriod(),
|
||||||
|
framework.ResourceEventHandlerFuncs{
|
||||||
|
// When pod updates (becomes ready), we need to enqueue deployment
|
||||||
|
UpdateFunc: dc.updatePod,
|
||||||
|
// When pod is deleted, we need to update deployment's expectations
|
||||||
|
DeleteFunc: dc.deletePod,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
dc.syncHandler = dc.syncDeployment
|
||||||
|
dc.rcStoreSynced = dc.rcController.HasSynced
|
||||||
|
dc.podStoreSynced = dc.podController.HasSynced
|
||||||
|
return dc
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run begins watching and syncing.
|
||||||
|
func (dc *DeploymentController) Run(workers int, stopCh <-chan struct{}) {
|
||||||
|
defer util.HandleCrash()
|
||||||
|
go dc.dController.Run(stopCh)
|
||||||
|
go dc.rcController.Run(stopCh)
|
||||||
|
go dc.podController.Run(stopCh)
|
||||||
|
for i := 0; i < workers; i++ {
|
||||||
|
go util.Until(dc.worker, time.Second, stopCh)
|
||||||
|
}
|
||||||
|
<-stopCh
|
||||||
|
glog.Infof("Shutting down deployment controller")
|
||||||
|
dc.queue.ShutDown()
|
||||||
|
}
|
||||||
|
|
||||||
|
// addRC enqueues the deployment that manages an RC when the RC is created.
|
||||||
|
func (dc *DeploymentController) addRC(obj interface{}) {
|
||||||
|
rc := obj.(*api.ReplicationController)
|
||||||
|
glog.V(4).Infof("Replication controller %s added.", rc.Name)
|
||||||
|
if d := dc.getDeploymentForRC(rc); rc != nil {
|
||||||
|
dc.enqueueDeployment(d)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) Run(syncPeriod time.Duration) {
|
// getDeploymentForRC returns the deployment managing the given RC.
|
||||||
go util.Until(func() {
|
// TODO: Surface that we are ignoring multiple deployments for a given controller.
|
||||||
errs := d.reconcileDeployments()
|
func (dc *DeploymentController) getDeploymentForRC(rc *api.ReplicationController) *extensions.Deployment {
|
||||||
for _, err := range errs {
|
deployments, err := dc.dStore.GetDeploymentsForRC(rc)
|
||||||
glog.Errorf("Failed to reconcile: %v", err)
|
if err != nil || len(deployments) == 0 {
|
||||||
}
|
glog.V(4).Infof("Error: %v. No deployment found for replication controller %v, deployment controller will avoid syncing.", err, rc.Name)
|
||||||
}, syncPeriod, util.NeverStop)
|
return nil
|
||||||
|
}
|
||||||
|
// Because all RC's belonging to a deployment should have a unique label key,
|
||||||
|
// there should never be more than one deployment returned by the above method.
|
||||||
|
// If that happens we should probably dynamically repair the situation by ultimately
|
||||||
|
// trying to clean up one of the controllers, for now we just return one of the two,
|
||||||
|
// likely randomly.
|
||||||
|
return &deployments[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) reconcileDeployments() []error {
|
// updateRC figures out what deployment(s) manage an RC when the RC is updated and
|
||||||
list, err := d.expClient.Deployments(api.NamespaceAll).List(api.ListOptions{})
|
// wake them up. If the anything of the RCs have changed, we need to awaken both
|
||||||
|
// the old and new deployments. old and cur must be *api.ReplicationController types.
|
||||||
|
func (dc *DeploymentController) updateRC(old, cur interface{}) {
|
||||||
|
if api.Semantic.DeepEqual(old, cur) {
|
||||||
|
// A periodic relist will send update events for all known controllers.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// TODO: Write a unittest for this case
|
||||||
|
curRC := cur.(*api.ReplicationController)
|
||||||
|
glog.V(4).Infof("Replication controller %s updated.", curRC.Name)
|
||||||
|
if d := dc.getDeploymentForRC(curRC); d != nil {
|
||||||
|
dc.enqueueDeployment(d)
|
||||||
|
}
|
||||||
|
// A number of things could affect the old deployment: labels changing,
|
||||||
|
// pod template changing, etc.
|
||||||
|
oldRC := old.(*api.ReplicationController)
|
||||||
|
if !api.Semantic.DeepEqual(oldRC, curRC) {
|
||||||
|
if oldD := dc.getDeploymentForRC(oldRC); oldD != nil {
|
||||||
|
dc.enqueueDeployment(oldD)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteRC enqueues the deployment that manages an RC when the RC is deleted.
|
||||||
|
// obj could be an *api.ReplicationController, or a DeletionFinalStateUnknown
|
||||||
|
// marker item.
|
||||||
|
func (dc *DeploymentController) deleteRC(obj interface{}) {
|
||||||
|
rc, ok := obj.(*api.ReplicationController)
|
||||||
|
|
||||||
|
// When a delete is dropped, the relist will notice a pod in the store not
|
||||||
|
// in the list, leading to the insertion of a tombstone object which contains
|
||||||
|
// the deleted key/value. Note that this value might be stale. If the RC
|
||||||
|
// changed labels the new deployment will not be woken up till the periodic resync.
|
||||||
|
if !ok {
|
||||||
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a deployment recreates/updates controllers", obj, FullDeploymentResyncPeriod)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rc, ok = tombstone.Obj.(*api.ReplicationController)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Tombstone contained object that is not an rc %+v, could take up to %v before a deployment recreates/updates controllers", obj, FullDeploymentResyncPeriod)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Replication controller %s deleted.", rc.Name)
|
||||||
|
if d := dc.getDeploymentForRC(rc); d != nil {
|
||||||
|
dc.enqueueDeployment(d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getDeploymentForPod returns the deployment managing the RC that manages the given Pod.
|
||||||
|
// TODO: Surface that we are ignoring multiple deployments for a given Pod.
|
||||||
|
func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment {
|
||||||
|
rcs, err := dc.rcStore.GetPodControllers(pod)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []error{fmt.Errorf("error listing deployments: %v", err)}
|
glog.V(4).Infof("Error: %v. No replication controllers found for pod %v, deployment controller will avoid syncing.", err, pod.Name)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
errs := []error{}
|
for _, rc := range rcs {
|
||||||
for _, deployment := range list.Items {
|
deployments, err := dc.dStore.GetDeploymentsForRC(&rc)
|
||||||
if err := d.reconcileDeployment(&deployment); err != nil {
|
if err == nil && len(deployments) > 0 {
|
||||||
errs = append(errs, fmt.Errorf("error in reconciling deployment %s: %v", deployment.Name, err))
|
return &deployments[0]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return errs
|
glog.V(4).Infof("No deployments found for pod %v, deployment controller will avoid syncing.", pod.Name)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) reconcileDeployment(deployment *extensions.Deployment) error {
|
// updatePod figures out what deployment(s) manage the RC that manages the Pod when the Pod
|
||||||
switch deployment.Spec.Strategy.Type {
|
// is updated and wake them up. If anything of the Pods have changed, we need to awaken both
|
||||||
case extensions.RecreateDeploymentStrategyType:
|
// the old and new deployments. old and cur must be *api.Pod types.
|
||||||
return d.reconcileRecreateDeployment(*deployment)
|
func (dc *DeploymentController) updatePod(old, cur interface{}) {
|
||||||
case extensions.RollingUpdateDeploymentStrategyType:
|
if api.Semantic.DeepEqual(old, cur) {
|
||||||
return d.reconcileRollingUpdateDeployment(*deployment)
|
return
|
||||||
|
}
|
||||||
|
curPod := cur.(*api.Pod)
|
||||||
|
glog.V(4).Infof("Pod %s updated.", curPod.Name)
|
||||||
|
if d := dc.getDeploymentForPod(curPod); d != nil {
|
||||||
|
dc.enqueueDeployment(d)
|
||||||
|
}
|
||||||
|
oldPod := old.(*api.Pod)
|
||||||
|
if !api.Semantic.DeepEqual(oldPod, curPod) {
|
||||||
|
if oldD := dc.getDeploymentForPod(oldPod); oldD != nil {
|
||||||
|
dc.enqueueDeployment(oldD)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("unexpected deployment strategy type: %s", deployment.Spec.Strategy.Type)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) reconcileRecreateDeployment(deployment extensions.Deployment) error {
|
// When a pod is deleted, update expectations of the controller that manages the pod.
|
||||||
|
// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
|
||||||
|
func (dc *DeploymentController) deletePod(obj interface{}) {
|
||||||
|
pod, ok := obj.(*api.Pod)
|
||||||
|
// When a delete is dropped, the relist will notice a pod in the store not
|
||||||
|
// in the list, leading to the insertion of a tombstone object which contains
|
||||||
|
// the deleted key/value. Note that this value might be stale. If the pod
|
||||||
|
// changed labels the new rc will not be woken up till the periodic resync.
|
||||||
|
if !ok {
|
||||||
|
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Couldn't get object from tombstone %+v, could take up to %v before a controller recreates a replica", obj, controller.ExpectationsTimeout)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pod, ok = tombstone.Obj.(*api.Pod)
|
||||||
|
if !ok {
|
||||||
|
glog.Errorf("Tombstone contained object that is not a pod %+v, could take up to %v before controller recreates a replica", obj, controller.ExpectationsTimeout)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Pod %s deleted.", pod.Name)
|
||||||
|
if d := dc.getDeploymentForPod(pod); d != nil {
|
||||||
|
dKey, err := controller.KeyFunc(d)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't get key for deployment controller %#v: %v", d, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
dc.expectations.DeletionObserved(dKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// obj could be an *api.Deployment, or a DeletionFinalStateUnknown marker item.
|
||||||
|
func (dc *DeploymentController) enqueueDeployment(obj interface{}) {
|
||||||
|
key, err := controller.KeyFunc(obj)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Handle overlapping deployments better. Either disallow them at admission time or
|
||||||
|
// deterministically avoid syncing deployments that fight over RC's. Currently, we only
|
||||||
|
// ensure that the same deployment is synced for a given RC. When we periodically relist
|
||||||
|
// all deployments there will still be some RC instability. One way to handle this is
|
||||||
|
// by querying the store for all deployments that this deployment overlaps, as well as all
|
||||||
|
// deployments that overlap this deployments, and sorting them.
|
||||||
|
dc.queue.Add(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
|
// It enforces that the syncHandler is never invoked concurrently with the same key.
|
||||||
|
func (dc *DeploymentController) worker() {
|
||||||
|
for {
|
||||||
|
func() {
|
||||||
|
key, quit := dc.queue.Get()
|
||||||
|
if quit {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer dc.queue.Done(key)
|
||||||
|
err := dc.syncHandler(key.(string))
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error syncing deployment: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncDeployment will sync the deployment with the given key.
|
||||||
|
// This function is not meant to be invoked concurrently with the same key.
|
||||||
|
func (dc *DeploymentController) syncDeployment(key string) error {
|
||||||
|
startTime := time.Now()
|
||||||
|
defer func() {
|
||||||
|
glog.V(4).Infof("Finished syncing deployment %q (%v)", key, time.Now().Sub(startTime))
|
||||||
|
}()
|
||||||
|
|
||||||
|
obj, exists, err := dc.dStore.Store.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
glog.Infof("Unable to retrieve deployment %v from store: %v", key, err)
|
||||||
|
dc.queue.Add(key)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
glog.Infof("Deployment has been deleted %v", key)
|
||||||
|
dc.expectations.DeleteExpectations(key)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
d := *obj.(*extensions.Deployment)
|
||||||
|
switch d.Spec.Strategy.Type {
|
||||||
|
case extensions.RecreateDeploymentStrategyType:
|
||||||
|
return dc.syncRecreateDeployment(d)
|
||||||
|
case extensions.RollingUpdateDeploymentStrategyType:
|
||||||
|
return dc.syncRollingUpdateDeployment(d)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dc *DeploymentController) syncRecreateDeployment(deployment extensions.Deployment) error {
|
||||||
// TODO: implement me.
|
// TODO: implement me.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment extensions.Deployment) error {
|
func (dc *DeploymentController) syncRollingUpdateDeployment(deployment extensions.Deployment) error {
|
||||||
newRC, err := d.getNewRC(deployment)
|
newRC, err := dc.getNewRC(deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
oldRCs, err := d.getOldRCs(deployment)
|
oldRCs, err := dc.getOldRCs(deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -100,36 +403,54 @@ func (d *DeploymentController) reconcileRollingUpdateDeployment(deployment exten
|
|||||||
allRCs := append(oldRCs, newRC)
|
allRCs := append(oldRCs, newRC)
|
||||||
|
|
||||||
// Scale up, if we can.
|
// Scale up, if we can.
|
||||||
scaledUp, err := d.reconcileNewRC(allRCs, newRC, deployment)
|
scaledUp, err := dc.reconcileNewRC(allRCs, newRC, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if scaledUp {
|
if scaledUp {
|
||||||
// Update DeploymentStatus
|
// Update DeploymentStatus
|
||||||
return d.updateDeploymentStatus(allRCs, newRC, deployment)
|
return dc.updateDeploymentStatus(allRCs, newRC, deployment)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Scale down, if we can.
|
// Scale down, if we can.
|
||||||
scaledDown, err := d.reconcileOldRCs(allRCs, oldRCs, newRC, deployment)
|
scaledDown, err := dc.reconcileOldRCs(allRCs, oldRCs, newRC, deployment, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if scaledDown {
|
if scaledDown {
|
||||||
// Update DeploymentStatus
|
// Update DeploymentStatus
|
||||||
return d.updateDeploymentStatus(allRCs, newRC, deployment)
|
return dc.updateDeploymentStatus(allRCs, newRC, deployment)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync deployment status
|
||||||
|
totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs)
|
||||||
|
updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC})
|
||||||
|
if deployment.Status.Replicas != totalReplicas || deployment.Status.UpdatedReplicas != updatedReplicas {
|
||||||
|
return dc.updateDeploymentStatus(allRCs, newRC, deployment)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: raise an event, neither scaled up nor down.
|
// TODO: raise an event, neither scaled up nor down.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
|
func (dc *DeploymentController) getOldRCs(deployment extensions.Deployment) ([]*api.ReplicationController, error) {
|
||||||
return deploymentutil.GetOldRCs(deployment, d.client)
|
return deploymentutil.GetOldRCsFromLists(deployment, dc.client,
|
||||||
|
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||||
|
podList, err := dc.podStore.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector))
|
||||||
|
return &podList, err
|
||||||
|
},
|
||||||
|
func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) {
|
||||||
|
return dc.rcStore.List()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns an RC that matches the intent of the given deployment.
|
// Returns an RC that matches the intent of the given deployment.
|
||||||
// It creates a new RC if required.
|
// It creates a new RC if required.
|
||||||
func (d *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) {
|
func (dc *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.ReplicationController, error) {
|
||||||
existingNewRC, err := deploymentutil.GetNewRC(deployment, d.client)
|
existingNewRC, err := deploymentutil.GetNewRCFromList(deployment, dc.client,
|
||||||
|
func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) {
|
||||||
|
return dc.rcStore.List()
|
||||||
|
})
|
||||||
if err != nil || existingNewRC != nil {
|
if err != nil || existingNewRC != nil {
|
||||||
return existingNewRC, err
|
return existingNewRC, err
|
||||||
}
|
}
|
||||||
@ -151,21 +472,21 @@ func (d *DeploymentController) getNewRC(deployment extensions.Deployment) (*api.
|
|||||||
Template: &newRCTemplate,
|
Template: &newRCTemplate,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
createdRC, err := d.client.ReplicationControllers(namespace).Create(&newRC)
|
createdRC, err := dc.client.ReplicationControllers(namespace).Create(&newRC)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error creating replication controller: %v", err)
|
return nil, fmt.Errorf("error creating replication controller: %v", err)
|
||||||
}
|
}
|
||||||
return createdRC, nil
|
return createdRC, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
|
func (dc *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
|
||||||
if newRC.Spec.Replicas == deployment.Spec.Replicas {
|
if newRC.Spec.Replicas == deployment.Spec.Replicas {
|
||||||
// Scaling not required.
|
// Scaling not required.
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
if newRC.Spec.Replicas > deployment.Spec.Replicas {
|
if newRC.Spec.Replicas > deployment.Spec.Replicas {
|
||||||
// Scale down.
|
// Scale down.
|
||||||
_, err := d.scaleRCAndRecordEvent(newRC, deployment.Spec.Replicas, deployment)
|
_, err := dc.scaleRCAndRecordEvent(newRC, deployment.Spec.Replicas, deployment)
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
// Check if we can scale up.
|
// Check if we can scale up.
|
||||||
@ -188,11 +509,12 @@ func (d *DeploymentController) reconcileNewRC(allRCs []*api.ReplicationControlle
|
|||||||
// Do not exceed the number of desired replicas.
|
// Do not exceed the number of desired replicas.
|
||||||
scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas)))
|
scaleUpCount = int(math.Min(float64(scaleUpCount), float64(deployment.Spec.Replicas-newRC.Spec.Replicas)))
|
||||||
newReplicasCount := newRC.Spec.Replicas + scaleUpCount
|
newReplicasCount := newRC.Spec.Replicas + scaleUpCount
|
||||||
_, err = d.scaleRCAndRecordEvent(newRC, newReplicasCount, deployment)
|
_, err = dc.scaleRCAndRecordEvent(newRC, newReplicasCount, deployment)
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) (bool, error) {
|
// Set expectationsCheck to false to bypass expectations check when testing
|
||||||
|
func (dc *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationController, oldRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment, expectationsCheck bool) (bool, error) {
|
||||||
oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs)
|
oldPodsCount := deploymentutil.GetReplicaCountForRCs(oldRCs)
|
||||||
if oldPodsCount == 0 {
|
if oldPodsCount == 0 {
|
||||||
// Cant scale down further
|
// Cant scale down further
|
||||||
@ -208,8 +530,17 @@ func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControll
|
|||||||
// Check if we can scale down.
|
// Check if we can scale down.
|
||||||
minAvailable := deployment.Spec.Replicas - maxUnavailable
|
minAvailable := deployment.Spec.Replicas - maxUnavailable
|
||||||
minReadySeconds := deployment.Spec.Strategy.RollingUpdate.MinReadySeconds
|
minReadySeconds := deployment.Spec.Strategy.RollingUpdate.MinReadySeconds
|
||||||
|
// Check the expectations of deployment before counting available pods
|
||||||
|
dKey, err := controller.KeyFunc(&deployment)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
|
||||||
|
}
|
||||||
|
if expectationsCheck && !dc.expectations.SatisfiedExpectations(dKey) {
|
||||||
|
fmt.Printf("Expectations not met yet before reconciling old RCs\n")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
// Find the number of ready pods.
|
// Find the number of ready pods.
|
||||||
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(d.client, allRCs, minReadySeconds)
|
readyPodCount, err := deploymentutil.GetAvailablePodsForRCs(dc.client, allRCs, minReadySeconds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("could not find available pods: %v", err)
|
return false, fmt.Errorf("could not find available pods: %v", err)
|
||||||
}
|
}
|
||||||
@ -231,16 +562,23 @@ func (d *DeploymentController) reconcileOldRCs(allRCs []*api.ReplicationControll
|
|||||||
// Scale down.
|
// Scale down.
|
||||||
scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount)))
|
scaleDownCount := int(math.Min(float64(targetRC.Spec.Replicas), float64(totalScaleDownCount)))
|
||||||
newReplicasCount := targetRC.Spec.Replicas - scaleDownCount
|
newReplicasCount := targetRC.Spec.Replicas - scaleDownCount
|
||||||
_, err = d.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment)
|
_, err = dc.scaleRCAndRecordEvent(targetRC, newReplicasCount, deployment)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
totalScaleDownCount -= scaleDownCount
|
totalScaleDownCount -= scaleDownCount
|
||||||
|
dKey, err := controller.KeyFunc(&deployment)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("Couldn't get key for deployment %#v: %v", deployment, err)
|
||||||
|
}
|
||||||
|
if expectationsCheck {
|
||||||
|
dc.expectations.ExpectDeletions(dKey, scaleDownCount)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) error {
|
func (dc *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationController, newRC *api.ReplicationController, deployment extensions.Deployment) error {
|
||||||
totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs)
|
totalReplicas := deploymentutil.GetReplicaCountForRCs(allRCs)
|
||||||
updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC})
|
updatedReplicas := deploymentutil.GetReplicaCountForRCs([]*api.ReplicationController{newRC})
|
||||||
newDeployment := deployment
|
newDeployment := deployment
|
||||||
@ -249,29 +587,29 @@ func (d *DeploymentController) updateDeploymentStatus(allRCs []*api.ReplicationC
|
|||||||
Replicas: totalReplicas,
|
Replicas: totalReplicas,
|
||||||
UpdatedReplicas: updatedReplicas,
|
UpdatedReplicas: updatedReplicas,
|
||||||
}
|
}
|
||||||
_, err := d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
|
_, err := dc.expClient.Deployments(deployment.ObjectMeta.Namespace).UpdateStatus(&newDeployment)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationController, newScale int, deployment extensions.Deployment) (*api.ReplicationController, error) {
|
func (dc *DeploymentController) scaleRCAndRecordEvent(rc *api.ReplicationController, newScale int, deployment extensions.Deployment) (*api.ReplicationController, error) {
|
||||||
scalingOperation := "down"
|
scalingOperation := "down"
|
||||||
if rc.Spec.Replicas < newScale {
|
if rc.Spec.Replicas < newScale {
|
||||||
scalingOperation = "up"
|
scalingOperation = "up"
|
||||||
}
|
}
|
||||||
newRC, err := d.scaleRC(rc, newScale)
|
newRC, err := dc.scaleRC(rc, newScale)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
d.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale)
|
dc.eventRecorder.Eventf(&deployment, api.EventTypeNormal, "ScalingRC", "Scaled %s rc %s to %d", scalingOperation, rc.Name, newScale)
|
||||||
}
|
}
|
||||||
return newRC, err
|
return newRC, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) {
|
func (dc *DeploymentController) scaleRC(rc *api.ReplicationController, newScale int) (*api.ReplicationController, error) {
|
||||||
// TODO: Using client for now, update to use store when it is ready.
|
// TODO: Using client for now, update to use store when it is ready.
|
||||||
rc.Spec.Replicas = newScale
|
rc.Spec.Replicas = newScale
|
||||||
return d.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
|
return dc.client.ReplicationControllers(rc.ObjectMeta.Namespace).Update(rc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
|
func (dc *DeploymentController) updateDeployment(deployment *extensions.Deployment) (*extensions.Deployment, error) {
|
||||||
// TODO: Using client for now, update to use store when it is ready.
|
// TODO: Using client for now, update to use store when it is ready.
|
||||||
return d.client.Extensions().Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
|
return dc.expClient.Deployments(deployment.ObjectMeta.Namespace).Update(deployment)
|
||||||
}
|
}
|
||||||
|
@ -21,10 +21,14 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
exp "k8s.io/kubernetes/pkg/apis/extensions"
|
exp "k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||||
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
"k8s.io/kubernetes/pkg/util/intstr"
|
"k8s.io/kubernetes/pkg/util/intstr"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -190,7 +194,7 @@ func TestDeploymentController_reconcileOldRCs(t *testing.T) {
|
|||||||
client: fake,
|
client: fake,
|
||||||
eventRecorder: &record.FakeRecorder{},
|
eventRecorder: &record.FakeRecorder{},
|
||||||
}
|
}
|
||||||
scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment)
|
scaled, err := controller.reconcileOldRCs(allRcs, oldRcs, nil, deployment, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
continue
|
continue
|
||||||
@ -258,3 +262,159 @@ func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOr
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var alwaysReady = func() bool { return true }
|
||||||
|
|
||||||
|
func newDeployment(replicas int) *exp.Deployment {
|
||||||
|
d := exp.Deployment{
|
||||||
|
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Default.GroupVersion().String()},
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
UID: util.NewUUID(),
|
||||||
|
Name: "foobar",
|
||||||
|
Namespace: api.NamespaceDefault,
|
||||||
|
ResourceVersion: "18",
|
||||||
|
},
|
||||||
|
Spec: exp.DeploymentSpec{
|
||||||
|
Strategy: exp.DeploymentStrategy{
|
||||||
|
Type: exp.RollingUpdateDeploymentStrategyType,
|
||||||
|
RollingUpdate: &exp.RollingUpdateDeployment{},
|
||||||
|
},
|
||||||
|
Replicas: replicas,
|
||||||
|
Selector: map[string]string{"foo": "bar"},
|
||||||
|
Template: api.PodTemplateSpec{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Labels: map[string]string{
|
||||||
|
"name": "foo",
|
||||||
|
"type": "production",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{
|
||||||
|
Image: "foo/bar",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return &d
|
||||||
|
}
|
||||||
|
|
||||||
|
func getKey(d *exp.Deployment, t *testing.T) string {
|
||||||
|
if key, err := controller.KeyFunc(d); err != nil {
|
||||||
|
t.Errorf("Unexpected error getting key for deployment %v: %v", d.Name, err)
|
||||||
|
return ""
|
||||||
|
} else {
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReplicationController(d *exp.Deployment, name string, replicas int) *api.ReplicationController {
|
||||||
|
return &api.ReplicationController{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: name,
|
||||||
|
Namespace: api.NamespaceDefault,
|
||||||
|
},
|
||||||
|
Spec: api.ReplicationControllerSpec{
|
||||||
|
Replicas: 0,
|
||||||
|
Template: &d.Spec.Template,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
type fixture struct {
|
||||||
|
t *testing.T
|
||||||
|
|
||||||
|
client *testclient.Fake
|
||||||
|
|
||||||
|
// Objects to put in the store.
|
||||||
|
dStore []*exp.Deployment
|
||||||
|
rcStore []*api.ReplicationController
|
||||||
|
podStore []*api.Pod
|
||||||
|
|
||||||
|
// Actions expected to happen on the client. Objects from here are also
|
||||||
|
// preloaded into NewSimpleFake.
|
||||||
|
actions []testclient.Action
|
||||||
|
objects *api.List
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fixture) expectUpdateDeploymentAction(d *exp.Deployment) {
|
||||||
|
f.actions = append(f.actions, testclient.NewUpdateAction("deployments", d.Namespace, d))
|
||||||
|
f.objects.Items = append(f.objects.Items, d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fixture) expectCreateRCAction(rc *api.ReplicationController) {
|
||||||
|
f.actions = append(f.actions, testclient.NewCreateAction("replicationcontrollers", rc.Namespace, rc))
|
||||||
|
f.objects.Items = append(f.objects.Items, rc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fixture) expectUpdateRCAction(rc *api.ReplicationController) {
|
||||||
|
f.actions = append(f.actions, testclient.NewUpdateAction("replicationcontrollers", rc.Namespace, rc))
|
||||||
|
f.objects.Items = append(f.objects.Items, rc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFixture(t *testing.T) *fixture {
|
||||||
|
f := &fixture{}
|
||||||
|
f.t = t
|
||||||
|
f.objects = &api.List{}
|
||||||
|
return f
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fixture) run(deploymentName string) {
|
||||||
|
f.client = testclient.NewSimpleFake(f.objects)
|
||||||
|
c := NewDeploymentController(f.client, controller.NoResyncPeriodFunc)
|
||||||
|
c.rcStoreSynced = alwaysReady
|
||||||
|
c.podStoreSynced = alwaysReady
|
||||||
|
for _, d := range f.dStore {
|
||||||
|
c.dStore.Store.Add(d)
|
||||||
|
}
|
||||||
|
for _, rc := range f.rcStore {
|
||||||
|
c.rcStore.Store.Add(rc)
|
||||||
|
}
|
||||||
|
for _, pod := range f.podStore {
|
||||||
|
c.podStore.Store.Add(pod)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := c.syncDeployment(deploymentName)
|
||||||
|
if err != nil {
|
||||||
|
f.t.Errorf("error syncing deployment: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
actions := f.client.Actions()
|
||||||
|
for i, action := range actions {
|
||||||
|
if len(f.actions) < i+1 {
|
||||||
|
f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:])
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedAction := f.actions[i]
|
||||||
|
if !expectedAction.Matches(action.GetVerb(), action.GetResource()) {
|
||||||
|
f.t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expectedAction, action)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(f.actions) > len(actions) {
|
||||||
|
f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSyncDeploymentCreatesRC(t *testing.T) {
|
||||||
|
f := newFixture(t)
|
||||||
|
|
||||||
|
d := newDeployment(1)
|
||||||
|
f.dStore = append(f.dStore, d)
|
||||||
|
|
||||||
|
// expect that one rc with zero replicas is created
|
||||||
|
// then is updated to 1 replica
|
||||||
|
rc := newReplicationController(d, "deploymentrc-4186632231", 0)
|
||||||
|
updatedRC := newReplicationController(d, "deploymentrc-4186632231", 1)
|
||||||
|
|
||||||
|
f.expectCreateRCAction(rc)
|
||||||
|
f.expectUpdateRCAction(updatedRC)
|
||||||
|
f.expectUpdateDeploymentAction(d)
|
||||||
|
|
||||||
|
f.run(getKey(d, t))
|
||||||
|
}
|
||||||
|
@ -210,7 +210,7 @@ func (rm *ReplicationManager) getPodController(pod *api.Pod) *api.ReplicationCon
|
|||||||
// overlap, sort by creation timestamp, subsort by name, then pick
|
// overlap, sort by creation timestamp, subsort by name, then pick
|
||||||
// the first.
|
// the first.
|
||||||
glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)
|
glog.Errorf("user error! more than one replication controller is selecting pods with labels: %+v", pod.Labels)
|
||||||
sort.Sort(overlappingControllers(controllers))
|
sort.Sort(OverlappingControllers(controllers))
|
||||||
}
|
}
|
||||||
return &controllers[0]
|
return &controllers[0]
|
||||||
}
|
}
|
||||||
|
@ -57,12 +57,12 @@ func updateReplicaCount(rcClient client.ReplicationControllerInterface, controll
|
|||||||
}
|
}
|
||||||
|
|
||||||
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
|
// OverlappingControllers sorts a list of controllers by creation timestamp, using their names as a tie breaker.
|
||||||
type overlappingControllers []api.ReplicationController
|
type OverlappingControllers []api.ReplicationController
|
||||||
|
|
||||||
func (o overlappingControllers) Len() int { return len(o) }
|
func (o OverlappingControllers) Len() int { return len(o) }
|
||||||
func (o overlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
func (o OverlappingControllers) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
||||||
|
|
||||||
func (o overlappingControllers) Less(i, j int) bool {
|
func (o OverlappingControllers) Less(i, j int) bool {
|
||||||
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
|
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
|
||||||
return o[i].Name < o[j].Name
|
return o[i].Name < o[j].Name
|
||||||
}
|
}
|
||||||
|
@ -28,27 +28,39 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Returns the old RCs targetted by the given Deployment.
|
// GetOldRCs returns the old RCs targeted by the given Deployment; get PodList and RCList from client interface.
|
||||||
func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, error) {
|
func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.ReplicationController, error) {
|
||||||
|
return GetOldRCsFromLists(deployment, c,
|
||||||
|
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||||
|
return c.Pods(namespace).List(options)
|
||||||
|
},
|
||||||
|
func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) {
|
||||||
|
rcList, err := c.ReplicationControllers(namespace).List(options)
|
||||||
|
return rcList.Items, err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetOldRCsFromLists returns the old RCs targeted by the given Deployment; get PodList and RCList with input functions.
|
||||||
|
func GetOldRCsFromLists(deployment extensions.Deployment, c client.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) ([]*api.ReplicationController, error) {
|
||||||
namespace := deployment.ObjectMeta.Namespace
|
namespace := deployment.ObjectMeta.Namespace
|
||||||
// 1. Find all pods whose labels match deployment.Spec.Selector
|
// 1. Find all pods whose labels match deployment.Spec.Selector
|
||||||
selector := labels.SelectorFromSet(deployment.Spec.Selector)
|
selector := labels.SelectorFromSet(deployment.Spec.Selector)
|
||||||
options := api.ListOptions{LabelSelector: selector}
|
options := api.ListOptions{LabelSelector: selector}
|
||||||
podList, err := c.Pods(namespace).List(options)
|
podList, err := getPodList(namespace, options)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error listing pods: %v", err)
|
return nil, fmt.Errorf("error listing pods: %v", err)
|
||||||
}
|
}
|
||||||
// 2. Find the corresponding RCs for pods in podList.
|
// 2. Find the corresponding RCs for pods in podList.
|
||||||
// TODO: Right now we list all RCs and then filter. We should add an API for this.
|
// TODO: Right now we list all RCs and then filter. We should add an API for this.
|
||||||
oldRCs := map[string]api.ReplicationController{}
|
oldRCs := map[string]api.ReplicationController{}
|
||||||
rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{})
|
rcList, err := getRcList(namespace, api.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error listing replication controllers: %v", err)
|
return nil, fmt.Errorf("error listing replication controllers: %v", err)
|
||||||
}
|
}
|
||||||
newRCTemplate := GetNewRCTemplate(deployment)
|
newRCTemplate := GetNewRCTemplate(deployment)
|
||||||
for _, pod := range podList.Items {
|
for _, pod := range podList.Items {
|
||||||
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
|
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
|
||||||
for _, rc := range rcList.Items {
|
for _, rc := range rcList {
|
||||||
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
|
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
|
||||||
if rcLabelsSelector.Matches(podLabelsSelector) {
|
if rcLabelsSelector.Matches(podLabelsSelector) {
|
||||||
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
|
// Filter out RC that has the same pod template spec as the deployment - that is the new RC.
|
||||||
@ -67,20 +79,30 @@ func GetOldRCs(deployment extensions.Deployment, c client.Interface) ([]*api.Rep
|
|||||||
return requiredRCs, nil
|
return requiredRCs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns an RC that matches the intent of the given deployment.
|
// GetNewRC returns an RC that matches the intent of the given deployment; get RCList from client interface.
|
||||||
// Returns nil if the new RC doesnt exist yet.
|
// Returns nil if the new RC doesnt exist yet.
|
||||||
func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.ReplicationController, error) {
|
func GetNewRC(deployment extensions.Deployment, c client.Interface) (*api.ReplicationController, error) {
|
||||||
|
return GetNewRCFromList(deployment, c,
|
||||||
|
func(namespace string, options api.ListOptions) ([]api.ReplicationController, error) {
|
||||||
|
rcList, err := c.ReplicationControllers(namespace).List(options)
|
||||||
|
return rcList.Items, err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNewRCFromList returns an RC that matches the intent of the given deployment; get RCList with the input function.
|
||||||
|
// Returns nil if the new RC doesnt exist yet.
|
||||||
|
func GetNewRCFromList(deployment extensions.Deployment, c client.Interface, getRcList func(string, api.ListOptions) ([]api.ReplicationController, error)) (*api.ReplicationController, error) {
|
||||||
namespace := deployment.ObjectMeta.Namespace
|
namespace := deployment.ObjectMeta.Namespace
|
||||||
rcList, err := c.ReplicationControllers(namespace).List(api.ListOptions{})
|
rcList, err := getRcList(namespace, api.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error listing replication controllers: %v", err)
|
return nil, fmt.Errorf("error listing replication controllers: %v", err)
|
||||||
}
|
}
|
||||||
newRCTemplate := GetNewRCTemplate(deployment)
|
newRCTemplate := GetNewRCTemplate(deployment)
|
||||||
|
|
||||||
for i := range rcList.Items {
|
for i := range rcList {
|
||||||
if api.Semantic.DeepEqual(rcList.Items[i].Spec.Template, &newRCTemplate) {
|
if api.Semantic.DeepEqual(rcList[i].Spec.Template, &newRCTemplate) {
|
||||||
// This is the new RC.
|
// This is the new RC.
|
||||||
return &rcList.Items[i], nil
|
return &rcList[i], nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// new RC does not exist.
|
// new RC does not exist.
|
||||||
|
@ -72,8 +72,14 @@ func testNewDeployment(f *Framework) {
|
|||||||
})
|
})
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
defer func() {
|
defer func() {
|
||||||
|
deployment, err := c.Deployments(ns).Get(deploymentName)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Logf("deleting deployment %s", deploymentName)
|
Logf("deleting deployment %s", deploymentName)
|
||||||
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
|
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
|
||||||
|
// TODO: remove this once we can delete rcs with deployment
|
||||||
|
newRC, err := deploymentutil.GetNewRC(*deployment, c)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred())
|
||||||
}()
|
}()
|
||||||
// Check that deployment is created fine.
|
// Check that deployment is created fine.
|
||||||
deployment, err := c.Deployments(ns).Get(deploymentName)
|
deployment, err := c.Deployments(ns).Get(deploymentName)
|
||||||
@ -166,8 +172,14 @@ func testRollingUpdateDeployment(f *Framework) {
|
|||||||
_, err = c.Deployments(ns).Create(&newDeployment)
|
_, err = c.Deployments(ns).Create(&newDeployment)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
defer func() {
|
defer func() {
|
||||||
|
deployment, err := c.Deployments(ns).Get(deploymentName)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Logf("deleting deployment %s", deploymentName)
|
Logf("deleting deployment %s", deploymentName)
|
||||||
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
|
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
|
||||||
|
// TODO: remove this once we can delete rcs with deployment
|
||||||
|
newRC, err := deploymentutil.GetNewRC(*deployment, c)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = waitForDeploymentStatus(c, ns, deploymentName, 3, 2, 4, 0)
|
err = waitForDeploymentStatus(c, ns, deploymentName, 3, 2, 4, 0)
|
||||||
@ -178,9 +190,9 @@ func testRollingUpdateDeploymentEvents(f *Framework) {
|
|||||||
ns := f.Namespace.Name
|
ns := f.Namespace.Name
|
||||||
c := f.Client
|
c := f.Client
|
||||||
// Create nginx pods.
|
// Create nginx pods.
|
||||||
deploymentPodLabels := map[string]string{"name": "sample-pod"}
|
deploymentPodLabels := map[string]string{"name": "sample-pod-2"}
|
||||||
rcPodLabels := map[string]string{
|
rcPodLabels := map[string]string{
|
||||||
"name": "sample-pod",
|
"name": "sample-pod-2",
|
||||||
"pod": "nginx",
|
"pod": "nginx",
|
||||||
}
|
}
|
||||||
rcName := "nginx-controller"
|
rcName := "nginx-controller"
|
||||||
@ -212,14 +224,14 @@ func testRollingUpdateDeploymentEvents(f *Framework) {
|
|||||||
Expect(c.ReplicationControllers(ns).Delete(rcName)).NotTo(HaveOccurred())
|
Expect(c.ReplicationControllers(ns).Delete(rcName)).NotTo(HaveOccurred())
|
||||||
}()
|
}()
|
||||||
// Verify that the required pods have come up.
|
// Verify that the required pods have come up.
|
||||||
err = verifyPods(c, ns, "sample-pod", false, 1)
|
err = verifyPods(c, ns, "sample-pod-2", false, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Logf("error in waiting for pods to come up: %s", err)
|
Logf("error in waiting for pods to come up: %s", err)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a deployment to delete nginx pods and instead bring up redis pods.
|
// Create a deployment to delete nginx pods and instead bring up redis pods.
|
||||||
deploymentName := "redis-deployment"
|
deploymentName := "redis-deployment-2"
|
||||||
Logf("Creating deployment %s", deploymentName)
|
Logf("Creating deployment %s", deploymentName)
|
||||||
newDeployment := extensions.Deployment{
|
newDeployment := extensions.Deployment{
|
||||||
ObjectMeta: api.ObjectMeta{
|
ObjectMeta: api.ObjectMeta{
|
||||||
@ -247,8 +259,14 @@ func testRollingUpdateDeploymentEvents(f *Framework) {
|
|||||||
_, err = c.Deployments(ns).Create(&newDeployment)
|
_, err = c.Deployments(ns).Create(&newDeployment)
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
defer func() {
|
defer func() {
|
||||||
|
deployment, err := c.Deployments(ns).Get(deploymentName)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
Logf("deleting deployment %s", deploymentName)
|
Logf("deleting deployment %s", deploymentName)
|
||||||
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
|
Expect(c.Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred())
|
||||||
|
// TODO: remove this once we can delete rcs with deployment
|
||||||
|
newRC, err := deploymentutil.GetNewRC(*deployment, c)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(c.ReplicationControllers(ns).Delete(newRC.Name)).NotTo(HaveOccurred())
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = waitForDeploymentStatus(c, ns, deploymentName, 1, 0, 2, 0)
|
err = waitForDeploymentStatus(c, ns, deploymentName, 1, 0, 2, 0)
|
||||||
|
Loading…
Reference in New Issue
Block a user