diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 869d6b03875..f22eeee4d93 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -36,6 +36,7 @@ import ( clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller/daemon" + "k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/node" @@ -72,6 +73,7 @@ type CMServer struct { PVClaimBinderSyncPeriod time.Duration VolumeConfigFlags VolumeConfigFlags HorizontalPodAutoscalerSyncPeriod time.Duration + DeploymentControllerSyncPeriod time.Duration RegisterRetryCount int NodeMonitorGracePeriod time.Duration NodeStartupGracePeriod time.Duration @@ -88,6 +90,7 @@ type CMServer struct { AllocateNodeCIDRs bool EnableProfiling bool EnableHorizontalPodAutoscaler bool + EnableDeploymentController bool Master string Kubeconfig string @@ -107,10 +110,12 @@ func NewCMServer() *CMServer { NamespaceSyncPeriod: 5 * time.Minute, PVClaimBinderSyncPeriod: 10 * time.Second, HorizontalPodAutoscalerSyncPeriod: 1 * time.Minute, + DeploymentControllerSyncPeriod: 1 * time.Minute, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, ClusterName: "kubernetes", EnableHorizontalPodAutoscaler: false, + EnableDeploymentController: false, VolumeConfigFlags: VolumeConfigFlags{ // default values here PersistentVolumeRecyclerTimeoutNFS: 300, @@ -145,6 +150,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { // TODO markt -- make this example a working config item with Recycler Config PR. // fs.MyExample(&s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "pv-recycler-timeout-nfs", s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "The minimum timeout for an NFS PV recycling operation") fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.") + fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.") fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.") fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.") fs.IntVar(&s.DeletingPodsBurst, "deleting-pods-burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.") @@ -168,6 +174,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") fs.StringVar(&s.RootCAFile, "root-ca-file", s.RootCAFile, "If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.") fs.BoolVar(&s.EnableHorizontalPodAutoscaler, "enable-horizontal-pod-autoscaler", s.EnableHorizontalPodAutoscaler, "Enables horizontal pod autoscaler (requires enabling experimental API on apiserver). NOT IMPLEMENTED YET!") + fs.BoolVar(&s.EnableDeploymentController, "enable-deployment-controller", s.EnableDeploymentController, "Enables deployment controller (requires enabling experimental API on apiserver). NOT IMPLEMENTED YET!") } // Run runs the CMServer. This should never exit. @@ -249,7 +256,7 @@ func (s *CMServer) Run(_ []string) error { resourceQuotaController.Run(s.ResourceQuotaSyncPeriod) // An OR of all flags to enable/disable experimental features - experimentalMode := s.EnableHorizontalPodAutoscaler + experimentalMode := s.EnableHorizontalPodAutoscaler || s.EnableDeploymentController namespaceController := namespacecontroller.NewNamespaceController(kubeClient, experimentalMode, s.NamespaceSyncPeriod) namespaceController.Run() @@ -257,6 +264,10 @@ func (s *CMServer) Run(_ []string) error { horizontalPodAutoscalerController := podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient)) horizontalPodAutoscalerController.Run(s.HorizontalPodAutoscalerSyncPeriod) } + if s.EnableDeploymentController { + deploymentController := deployment.New(kubeClient) + deploymentController.Run(s.DeploymentControllerSyncPeriod) + } pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 8f23af2de0f..59839911f7f 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -68,6 +68,7 @@ duration-sec e2e-output-dir enable-debugging-handlers enable-horizontal-pod-autoscaler +enable-deployment-controller enable-server etcd-config etcd-prefix @@ -101,6 +102,7 @@ ha-domain healthz-bind-address healthz-port horizontal-pod-autoscaler-sync-period +deployment-controller-sync-period hostname-override host-network-sources http-check-frequency diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go new file mode 100644 index 00000000000..b16ec67fac7 --- /dev/null +++ b/pkg/controller/deployment/deployment_controller.go @@ -0,0 +1,150 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deployment + +import ( + "fmt" + "hash/adler32" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/apis/experimental" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util" +) + +type DeploymentController struct { + client client.Interface + expClient client.ExperimentalInterface +} + +func New(client client.Interface) *DeploymentController { + return &DeploymentController{ + client: client, + expClient: client.Experimental(), + } +} + +func (d *DeploymentController) Run(syncPeriod time.Duration) { + go util.Until(func() { + if err := d.reconcileDeployments(); err != nil { + glog.Errorf("Couldnt reconcile deployments: %v", err) + } + }, syncPeriod, util.NeverStop) +} + +func (d *DeploymentController) reconcileDeployments() error { + list, err := d.expClient.Deployments(api.NamespaceAll).List(labels.Everything(), fields.Everything()) + if err != nil { + return fmt.Errorf("error listing deployments: %v", err) + } + for _, deployment := range list.Items { + if err := d.reconcileDeployment(&deployment); err != nil { + return fmt.Errorf("error in reconciling deployment: %v", err) + } + } + return nil +} + +func (d *DeploymentController) reconcileDeployment(deployment *experimental.Deployment) error { + targetedRCs, err := d.getTargetedRCs(deployment) + if err != nil { + return err + } + desiredRC, err := d.getDesiredRC(deployment) + if err != nil { + return err + } + // TODO: Scale up and down the targeted and desired RCs. + // For now, just print their names, until we start doing something useful. + for _, targetedRC := range targetedRCs { + glog.Infof("TargetedRC: %s", targetedRC.ObjectMeta.Name) + } + glog.Infof("DesiredRC: %s", desiredRC.ObjectMeta.Name) + return nil +} + +func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deployment) ([]api.ReplicationController, error) { + namespace := deployment.ObjectMeta.Namespace + // 1. Find all pods whose labels match deployment.Spec.Selector + podList, err := d.client.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector), fields.Everything()) + if err != nil { + return nil, fmt.Errorf("error listing pods: %v", err) + } + // 2. Find the corresponding RCs for pods in podList. + // TODO: Right now we list all RCs and then filter. We should add an API for this. + targetedRCs := map[string]api.ReplicationController{} + rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("error listing replication controllers: %v", err) + } + for _, pod := range podList.Items { + podLabelsSelector := labels.Set(pod.ObjectMeta.Labels) + for _, rc := range rcList.Items { + rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector) + if rcLabelsSelector.Matches(podLabelsSelector) { + targetedRCs[rc.ObjectMeta.Name] = rc + continue + } + } + } + requiredRCs := []api.ReplicationController{} + for _, value := range targetedRCs { + requiredRCs = append(requiredRCs, value) + } + return requiredRCs, nil +} + +// Returns an RC that matches the intent of the given deployment. +// It creates a new RC if required. +func (d *DeploymentController) getDesiredRC(deployment *experimental.Deployment) (*api.ReplicationController, error) { + namespace := deployment.ObjectMeta.Namespace + // Find if the required RC exists already. + rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything()) + if err != nil { + return nil, fmt.Errorf("error listing replication controllers: %v", err) + } + for _, rc := range rcList.Items { + if api.Semantic.DeepEqual(rc.Spec.Template, deployment.Spec.Template) { + // This is the desired RC. + return &rc, nil + } + } + // desired RC does not exist, create a new one. + podTemplateSpecHasher := adler32.New() + util.DeepHashObject(podTemplateSpecHasher, deployment.Spec.Template) + podTemplateSpecHash := podTemplateSpecHasher.Sum32() + rcName := fmt.Sprintf("deploymentrc-%d", podTemplateSpecHash) + desiredRC := api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: rcName, + Namespace: namespace, + }, + Spec: api.ReplicationControllerSpec{ + Replicas: 0, + Template: deployment.Spec.Template, + }, + } + createdRC, err := d.client.ReplicationControllers(namespace).Create(&desiredRC) + if err != nil { + return nil, fmt.Errorf("error creating replication controller: %v", err) + } + return createdRC, nil +}