mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Adding a deploymentController
This commit is contained in:
parent
00192ee944
commit
c97b9db700
@ -36,6 +36,7 @@ import (
|
|||||||
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
|
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
|
||||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||||
"k8s.io/kubernetes/pkg/controller/daemon"
|
"k8s.io/kubernetes/pkg/controller/daemon"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/deployment"
|
||||||
"k8s.io/kubernetes/pkg/controller/endpoint"
|
"k8s.io/kubernetes/pkg/controller/endpoint"
|
||||||
"k8s.io/kubernetes/pkg/controller/namespace"
|
"k8s.io/kubernetes/pkg/controller/namespace"
|
||||||
"k8s.io/kubernetes/pkg/controller/node"
|
"k8s.io/kubernetes/pkg/controller/node"
|
||||||
@ -72,6 +73,7 @@ type CMServer struct {
|
|||||||
PVClaimBinderSyncPeriod time.Duration
|
PVClaimBinderSyncPeriod time.Duration
|
||||||
VolumeConfigFlags VolumeConfigFlags
|
VolumeConfigFlags VolumeConfigFlags
|
||||||
HorizontalPodAutoscalerSyncPeriod time.Duration
|
HorizontalPodAutoscalerSyncPeriod time.Duration
|
||||||
|
DeploymentControllerSyncPeriod time.Duration
|
||||||
RegisterRetryCount int
|
RegisterRetryCount int
|
||||||
NodeMonitorGracePeriod time.Duration
|
NodeMonitorGracePeriod time.Duration
|
||||||
NodeStartupGracePeriod time.Duration
|
NodeStartupGracePeriod time.Duration
|
||||||
@ -88,6 +90,7 @@ type CMServer struct {
|
|||||||
AllocateNodeCIDRs bool
|
AllocateNodeCIDRs bool
|
||||||
EnableProfiling bool
|
EnableProfiling bool
|
||||||
EnableHorizontalPodAutoscaler bool
|
EnableHorizontalPodAutoscaler bool
|
||||||
|
EnableDeploymentController bool
|
||||||
|
|
||||||
Master string
|
Master string
|
||||||
Kubeconfig string
|
Kubeconfig string
|
||||||
@ -107,10 +110,12 @@ func NewCMServer() *CMServer {
|
|||||||
NamespaceSyncPeriod: 5 * time.Minute,
|
NamespaceSyncPeriod: 5 * time.Minute,
|
||||||
PVClaimBinderSyncPeriod: 10 * time.Second,
|
PVClaimBinderSyncPeriod: 10 * time.Second,
|
||||||
HorizontalPodAutoscalerSyncPeriod: 1 * time.Minute,
|
HorizontalPodAutoscalerSyncPeriod: 1 * time.Minute,
|
||||||
|
DeploymentControllerSyncPeriod: 1 * time.Minute,
|
||||||
RegisterRetryCount: 10,
|
RegisterRetryCount: 10,
|
||||||
PodEvictionTimeout: 5 * time.Minute,
|
PodEvictionTimeout: 5 * time.Minute,
|
||||||
ClusterName: "kubernetes",
|
ClusterName: "kubernetes",
|
||||||
EnableHorizontalPodAutoscaler: false,
|
EnableHorizontalPodAutoscaler: false,
|
||||||
|
EnableDeploymentController: false,
|
||||||
VolumeConfigFlags: VolumeConfigFlags{
|
VolumeConfigFlags: VolumeConfigFlags{
|
||||||
// default values here
|
// default values here
|
||||||
PersistentVolumeRecyclerTimeoutNFS: 300,
|
PersistentVolumeRecyclerTimeoutNFS: 300,
|
||||||
@ -145,6 +150,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
// TODO markt -- make this example a working config item with Recycler Config PR.
|
// TODO markt -- make this example a working config item with Recycler Config PR.
|
||||||
// fs.MyExample(&s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "pv-recycler-timeout-nfs", s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "The minimum timeout for an NFS PV recycling operation")
|
// fs.MyExample(&s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "pv-recycler-timeout-nfs", s.VolumeConfig.PersistentVolumeRecyclerTimeoutNFS, "The minimum timeout for an NFS PV recycling operation")
|
||||||
fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.")
|
fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.")
|
||||||
|
fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.")
|
||||||
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.")
|
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.")
|
||||||
fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
|
fs.Float32Var(&s.DeletingPodsQps, "deleting-pods-qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
|
||||||
fs.IntVar(&s.DeletingPodsBurst, "deleting-pods-burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
|
fs.IntVar(&s.DeletingPodsBurst, "deleting-pods-burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
|
||||||
@ -168,6 +174,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||||||
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.")
|
||||||
fs.StringVar(&s.RootCAFile, "root-ca-file", s.RootCAFile, "If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.")
|
fs.StringVar(&s.RootCAFile, "root-ca-file", s.RootCAFile, "If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle.")
|
||||||
fs.BoolVar(&s.EnableHorizontalPodAutoscaler, "enable-horizontal-pod-autoscaler", s.EnableHorizontalPodAutoscaler, "Enables horizontal pod autoscaler (requires enabling experimental API on apiserver). NOT IMPLEMENTED YET!")
|
fs.BoolVar(&s.EnableHorizontalPodAutoscaler, "enable-horizontal-pod-autoscaler", s.EnableHorizontalPodAutoscaler, "Enables horizontal pod autoscaler (requires enabling experimental API on apiserver). NOT IMPLEMENTED YET!")
|
||||||
|
fs.BoolVar(&s.EnableDeploymentController, "enable-deployment-controller", s.EnableDeploymentController, "Enables deployment controller (requires enabling experimental API on apiserver). NOT IMPLEMENTED YET!")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run runs the CMServer. This should never exit.
|
// Run runs the CMServer. This should never exit.
|
||||||
@ -249,7 +256,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
resourceQuotaController.Run(s.ResourceQuotaSyncPeriod)
|
resourceQuotaController.Run(s.ResourceQuotaSyncPeriod)
|
||||||
|
|
||||||
// An OR of all flags to enable/disable experimental features
|
// An OR of all flags to enable/disable experimental features
|
||||||
experimentalMode := s.EnableHorizontalPodAutoscaler
|
experimentalMode := s.EnableHorizontalPodAutoscaler || s.EnableDeploymentController
|
||||||
namespaceController := namespacecontroller.NewNamespaceController(kubeClient, experimentalMode, s.NamespaceSyncPeriod)
|
namespaceController := namespacecontroller.NewNamespaceController(kubeClient, experimentalMode, s.NamespaceSyncPeriod)
|
||||||
namespaceController.Run()
|
namespaceController.Run()
|
||||||
|
|
||||||
@ -257,6 +264,10 @@ func (s *CMServer) Run(_ []string) error {
|
|||||||
horizontalPodAutoscalerController := podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient))
|
horizontalPodAutoscalerController := podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient))
|
||||||
horizontalPodAutoscalerController.Run(s.HorizontalPodAutoscalerSyncPeriod)
|
horizontalPodAutoscalerController.Run(s.HorizontalPodAutoscalerSyncPeriod)
|
||||||
}
|
}
|
||||||
|
if s.EnableDeploymentController {
|
||||||
|
deploymentController := deployment.New(kubeClient)
|
||||||
|
deploymentController.Run(s.DeploymentControllerSyncPeriod)
|
||||||
|
}
|
||||||
|
|
||||||
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
|
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
|
||||||
pvclaimBinder.Run()
|
pvclaimBinder.Run()
|
||||||
|
@ -68,6 +68,7 @@ duration-sec
|
|||||||
e2e-output-dir
|
e2e-output-dir
|
||||||
enable-debugging-handlers
|
enable-debugging-handlers
|
||||||
enable-horizontal-pod-autoscaler
|
enable-horizontal-pod-autoscaler
|
||||||
|
enable-deployment-controller
|
||||||
enable-server
|
enable-server
|
||||||
etcd-config
|
etcd-config
|
||||||
etcd-prefix
|
etcd-prefix
|
||||||
@ -101,6 +102,7 @@ ha-domain
|
|||||||
healthz-bind-address
|
healthz-bind-address
|
||||||
healthz-port
|
healthz-port
|
||||||
horizontal-pod-autoscaler-sync-period
|
horizontal-pod-autoscaler-sync-period
|
||||||
|
deployment-controller-sync-period
|
||||||
hostname-override
|
hostname-override
|
||||||
host-network-sources
|
host-network-sources
|
||||||
http-check-frequency
|
http-check-frequency
|
||||||
|
150
pkg/controller/deployment/deployment_controller.go
Normal file
150
pkg/controller/deployment/deployment_controller.go
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package deployment
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"hash/adler32"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/apis/experimental"
|
||||||
|
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||||
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DeploymentController struct {
|
||||||
|
client client.Interface
|
||||||
|
expClient client.ExperimentalInterface
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(client client.Interface) *DeploymentController {
|
||||||
|
return &DeploymentController{
|
||||||
|
client: client,
|
||||||
|
expClient: client.Experimental(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DeploymentController) Run(syncPeriod time.Duration) {
|
||||||
|
go util.Until(func() {
|
||||||
|
if err := d.reconcileDeployments(); err != nil {
|
||||||
|
glog.Errorf("Couldnt reconcile deployments: %v", err)
|
||||||
|
}
|
||||||
|
}, syncPeriod, util.NeverStop)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DeploymentController) reconcileDeployments() error {
|
||||||
|
list, err := d.expClient.Deployments(api.NamespaceAll).List(labels.Everything(), fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error listing deployments: %v", err)
|
||||||
|
}
|
||||||
|
for _, deployment := range list.Items {
|
||||||
|
if err := d.reconcileDeployment(&deployment); err != nil {
|
||||||
|
return fmt.Errorf("error in reconciling deployment: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DeploymentController) reconcileDeployment(deployment *experimental.Deployment) error {
|
||||||
|
targetedRCs, err := d.getTargetedRCs(deployment)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
desiredRC, err := d.getDesiredRC(deployment)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// TODO: Scale up and down the targeted and desired RCs.
|
||||||
|
// For now, just print their names, until we start doing something useful.
|
||||||
|
for _, targetedRC := range targetedRCs {
|
||||||
|
glog.Infof("TargetedRC: %s", targetedRC.ObjectMeta.Name)
|
||||||
|
}
|
||||||
|
glog.Infof("DesiredRC: %s", desiredRC.ObjectMeta.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DeploymentController) getTargetedRCs(deployment *experimental.Deployment) ([]api.ReplicationController, error) {
|
||||||
|
namespace := deployment.ObjectMeta.Namespace
|
||||||
|
// 1. Find all pods whose labels match deployment.Spec.Selector
|
||||||
|
podList, err := d.client.Pods(namespace).List(labels.SelectorFromSet(deployment.Spec.Selector), fields.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error listing pods: %v", err)
|
||||||
|
}
|
||||||
|
// 2. Find the corresponding RCs for pods in podList.
|
||||||
|
// TODO: Right now we list all RCs and then filter. We should add an API for this.
|
||||||
|
targetedRCs := map[string]api.ReplicationController{}
|
||||||
|
rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error listing replication controllers: %v", err)
|
||||||
|
}
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
podLabelsSelector := labels.Set(pod.ObjectMeta.Labels)
|
||||||
|
for _, rc := range rcList.Items {
|
||||||
|
rcLabelsSelector := labels.SelectorFromSet(rc.Spec.Selector)
|
||||||
|
if rcLabelsSelector.Matches(podLabelsSelector) {
|
||||||
|
targetedRCs[rc.ObjectMeta.Name] = rc
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
requiredRCs := []api.ReplicationController{}
|
||||||
|
for _, value := range targetedRCs {
|
||||||
|
requiredRCs = append(requiredRCs, value)
|
||||||
|
}
|
||||||
|
return requiredRCs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns an RC that matches the intent of the given deployment.
|
||||||
|
// It creates a new RC if required.
|
||||||
|
func (d *DeploymentController) getDesiredRC(deployment *experimental.Deployment) (*api.ReplicationController, error) {
|
||||||
|
namespace := deployment.ObjectMeta.Namespace
|
||||||
|
// Find if the required RC exists already.
|
||||||
|
rcList, err := d.client.ReplicationControllers(namespace).List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error listing replication controllers: %v", err)
|
||||||
|
}
|
||||||
|
for _, rc := range rcList.Items {
|
||||||
|
if api.Semantic.DeepEqual(rc.Spec.Template, deployment.Spec.Template) {
|
||||||
|
// This is the desired RC.
|
||||||
|
return &rc, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// desired RC does not exist, create a new one.
|
||||||
|
podTemplateSpecHasher := adler32.New()
|
||||||
|
util.DeepHashObject(podTemplateSpecHasher, deployment.Spec.Template)
|
||||||
|
podTemplateSpecHash := podTemplateSpecHasher.Sum32()
|
||||||
|
rcName := fmt.Sprintf("deploymentrc-%d", podTemplateSpecHash)
|
||||||
|
desiredRC := api.ReplicationController{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: rcName,
|
||||||
|
Namespace: namespace,
|
||||||
|
},
|
||||||
|
Spec: api.ReplicationControllerSpec{
|
||||||
|
Replicas: 0,
|
||||||
|
Template: deployment.Spec.Template,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
createdRC, err := d.client.ReplicationControllers(namespace).Create(&desiredRC)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("error creating replication controller: %v", err)
|
||||||
|
}
|
||||||
|
return createdRC, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user