cmd/kube-controller-manager

This commit is contained in:
Chao Xu
2016-11-18 12:50:17 -08:00
parent 48536eaef9
commit 7eeb71f698
109 changed files with 4380 additions and 4153 deletions

View File

@@ -25,11 +25,11 @@ import (
"reflect"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5"
unversioned_core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
@@ -65,7 +65,7 @@ const (
type cachedService struct {
// The cached state of the service
state *api.Service
state *v1.Service
// Controls error back-off
lastRetryDelay time.Duration
}
@@ -78,7 +78,7 @@ type serviceCache struct {
type ServiceController struct {
cloud cloudprovider.Interface
knownHosts []string
servicesToUpdate []*api.Service
servicesToUpdate []*v1.Service
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
@@ -100,7 +100,7 @@ type ServiceController struct {
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "service-controller"})
recorder := broadcaster.NewRecorder(v1.EventSource{Component: "service-controller"})
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.Core().RESTClient().GetRateLimiter())
@@ -121,20 +121,20 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
}
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (pkg_runtime.Object, error) {
return s.kubeClient.Core().Services(api.NamespaceAll).List(options)
ListFunc: func(options v1.ListOptions) (pkg_runtime.Object, error) {
return s.kubeClient.Core().Services(v1.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return s.kubeClient.Core().Services(api.NamespaceAll).Watch(options)
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return s.kubeClient.Core().Services(v1.NamespaceAll).Watch(options)
},
},
&api.Service{},
&v1.Service{},
serviceSyncPeriod,
cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
oldSvc, ok1 := old.(*api.Service)
curSvc, ok2 := cur.(*api.Service)
oldSvc, ok1 := old.(*v1.Service)
curSvc, ok2 := cur.(*v1.Service)
if ok1 && ok2 && s.needsUpdate(oldSvc, curSvc) {
s.enqueueService(cur)
}
@@ -149,7 +149,7 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
return s, nil
}
// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item.
func (s *ServiceController) enqueueService(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
@@ -175,8 +175,8 @@ func (s *ServiceController) Run(workers int) {
for i := 0; i < workers; i++ {
go wait.Until(s.worker, time.Second, wait.NeverStop)
}
nodeLW := cache.NewListWatchFromClient(s.kubeClient.Core().RESTClient(), "nodes", api.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &api.Node{}, s.nodeLister.Store, 0).Run()
nodeLW := cache.NewListWatchFromClient(s.kubeClient.Core().RESTClient(), "nodes", v1.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &v1.Node{}, s.nodeLister.Store, 0).Run()
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
}
@@ -224,7 +224,7 @@ func (s *ServiceController) init() error {
// Returns an error if processing the service update failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *api.Service, key string) (error, time.Duration) {
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
// cache the service, we need the info for service deletion
cachedService.state = service
@@ -237,7 +237,7 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
message += " (will not retry): "
}
message += err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
s.eventRecorder.Event(service, v1.EventTypeWarning, "CreatingLoadBalancerFailed", message)
return err, cachedService.nextRetryDelay()
}
@@ -253,13 +253,13 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.Service) (error, bool) {
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
// Save the state so we can avoid a write if it doesn't change
previousState := api.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
previousState := v1.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
if !wantsLoadBalancer(service) {
needDelete := true
@@ -273,31 +273,31 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.
if needDelete {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
return err, retryable
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
}
service.Status.LoadBalancer = api.LoadBalancerStatus{}
service.Status.LoadBalancer = v1.LoadBalancerStatus{}
} else {
glog.V(2).Infof("Ensuring LB for service %s", key)
// TODO: We could do a dry-run here if wanted to avoid the spurious cloud-calls & events when we restart
// The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
s.eventRecorder.Event(service, v1.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
err := s.createLoadBalancer(service)
if err != nil {
return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
}
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
s.eventRecorder.Event(service, v1.EventTypeNormal, "CreatedLoadBalancer", "Created load balancer")
}
// Write the state if changed
// TODO: Be careful here ... what if there were other changes to the service?
if !api.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
if !v1.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
if err := s.persistUpdate(service); err != nil {
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
}
@@ -308,7 +308,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *api.
return nil, notRetryable
}
func (s *ServiceController) persistUpdate(service *api.Service) error {
func (s *ServiceController) persistUpdate(service *v1.Service) error {
var err error
for i := 0; i < clientRetryCount; i++ {
_, err = s.kubeClient.Core().Services(service.Namespace).UpdateStatus(service)
@@ -338,7 +338,7 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
return err
}
func (s *ServiceController) createLoadBalancer(service *api.Service) error {
func (s *ServiceController) createLoadBalancer(service *v1.Service) error {
nodes, err := s.nodeLister.List()
if err != nil {
return err
@@ -381,10 +381,10 @@ func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) {
// ListKeys implements the interface required by DeltaFIFO to list the keys we
// already know about.
func (s *serviceCache) allServices() []*api.Service {
func (s *serviceCache) allServices() []*v1.Service {
s.mu.Lock()
defer s.mu.Unlock()
services := make([]*api.Service, 0, len(s.serviceMap))
services := make([]*v1.Service, 0, len(s.serviceMap))
for _, v := range s.serviceMap {
services = append(services, v.state)
}
@@ -421,12 +421,12 @@ func (s *serviceCache) delete(serviceName string) {
delete(s.serviceMap, serviceName)
}
func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api.Service) bool {
func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool {
if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) {
return false
}
if wantsLoadBalancer(oldService) != wantsLoadBalancer(newService) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "Type", "%v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "Type", "%v -> %v",
oldService.Spec.Type, newService.Spec.Type)
return true
}
@@ -434,18 +434,18 @@ func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api
return true
}
if !loadBalancerIPsAreEqual(oldService, newService) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "LoadbalancerIP", "%v -> %v",
oldService.Spec.LoadBalancerIP, newService.Spec.LoadBalancerIP)
return true
}
if len(oldService.Spec.ExternalIPs) != len(newService.Spec.ExternalIPs) {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Count: %v -> %v",
len(oldService.Spec.ExternalIPs), len(newService.Spec.ExternalIPs))
return true
}
for i := range oldService.Spec.ExternalIPs {
if oldService.Spec.ExternalIPs[i] != newService.Spec.ExternalIPs[i] {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "ExternalIP", "Added: %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "ExternalIP", "Added: %v",
newService.Spec.ExternalIPs[i])
return true
}
@@ -454,7 +454,7 @@ func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api
return true
}
if oldService.UID != newService.UID {
s.eventRecorder.Eventf(newService, api.EventTypeNormal, "UID", "%v -> %v",
s.eventRecorder.Eventf(newService, v1.EventTypeNormal, "UID", "%v -> %v",
oldService.UID, newService.UID)
return true
}
@@ -462,14 +462,14 @@ func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api
return false
}
func (s *ServiceController) loadBalancerName(service *api.Service) string {
func (s *ServiceController) loadBalancerName(service *v1.Service) string {
return cloudprovider.GetLoadBalancerName(service)
}
func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
var protocol api.Protocol
func getPortsForLB(service *v1.Service) ([]*v1.ServicePort, error) {
var protocol v1.Protocol
ports := []*api.ServicePort{}
ports := []*v1.ServicePort{}
for i := range service.Spec.Ports {
sp := &service.Spec.Ports[i]
// The check on protocol was removed here. The cloud provider itself is now responsible for all protocol validation
@@ -484,7 +484,7 @@ func getPortsForLB(service *api.Service) ([]*api.ServicePort, error) {
return ports, nil
}
func portsEqualForLB(x, y *api.Service) bool {
func portsEqualForLB(x, y *v1.Service) bool {
xPorts, err := getPortsForLB(x)
if err != nil {
return false
@@ -496,7 +496,7 @@ func portsEqualForLB(x, y *api.Service) bool {
return portSlicesEqualForLB(xPorts, yPorts)
}
func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
func portSlicesEqualForLB(x, y []*v1.ServicePort) bool {
if len(x) != len(y) {
return false
}
@@ -509,7 +509,7 @@ func portSlicesEqualForLB(x, y []*api.ServicePort) bool {
return true
}
func portEqualForLB(x, y *api.ServicePort) bool {
func portEqualForLB(x, y *v1.ServicePort) bool {
// TODO: Should we check name? (In theory, an LB could expose it)
if x.Name != y.Name {
return false
@@ -569,11 +569,11 @@ func stringSlicesEqual(x, y []string) bool {
return true
}
func includeNodeFromNodeList(node *api.Node) bool {
func includeNodeFromNodeList(node *v1.Node) bool {
return !node.Spec.Unschedulable
}
func hostsFromNodeList(list *api.NodeList) []string {
func hostsFromNodeList(list *v1.NodeList) []string {
result := []string{}
for ix := range list.Items {
if includeNodeFromNodeList(&list.Items[ix]) {
@@ -583,7 +583,7 @@ func hostsFromNodeList(list *api.NodeList) []string {
return result
}
func hostsFromNodeSlice(nodes []*api.Node) []string {
func hostsFromNodeSlice(nodes []*v1.Node) []string {
result := []string{}
for _, node := range nodes {
if includeNodeFromNodeList(node) {
@@ -594,7 +594,7 @@ func hostsFromNodeSlice(nodes []*api.Node) []string {
}
func getNodeConditionPredicate() cache.NodeConditionPredicate {
return func(node *api.Node) bool {
return func(node *v1.Node) bool {
// We add the master to the node list, but its unschedulable. So we use this to filter
// the master.
// TODO: Use a node annotation to indicate the master
@@ -608,7 +608,7 @@ func getNodeConditionPredicate() cache.NodeConditionPredicate {
for _, cond := range node.Status.Conditions {
// We consider the node for load balancing only when its NodeReady condition status
// is ConditionTrue
if cond.Type == api.NodeReady && cond.Status != api.ConditionTrue {
if cond.Type == v1.NodeReady && cond.Status != v1.ConditionTrue {
glog.V(4).Infof("Ignoring node %v with %v condition status %v", node.Name, cond.Type, cond.Status)
return false
}
@@ -648,7 +648,7 @@ func (s *ServiceController) nodeSyncLoop() {
// updateLoadBalancerHosts updates all existing load balancers so that
// they will match the list of hosts provided.
// Returns the list of services that couldn't be updated.
func (s *ServiceController) updateLoadBalancerHosts(services []*api.Service, hosts []string) (servicesToRetry []*api.Service) {
func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []string) (servicesToRetry []*v1.Service) {
for _, service := range services {
func() {
if service == nil {
@@ -665,7 +665,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*api.Service, hos
// Updates the load balancer of a service, assuming we hold the mutex
// associated with the service.
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service, hosts []string) error {
func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []string) error {
if !wantsLoadBalancer(service) {
return nil
}
@@ -673,7 +673,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
if err == nil {
s.eventRecorder.Event(service, api.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
s.eventRecorder.Event(service, v1.EventTypeNormal, "UpdatedLoadBalancer", "Updated load balancer with new hosts")
return nil
}
@@ -684,15 +684,15 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *api.Service,
return nil
}
s.eventRecorder.Eventf(service, api.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err)
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "LoadBalancerUpdateFailed", "Error updating load balancer with new hosts %v: %v", hosts, err)
return err
}
func wantsLoadBalancer(service *api.Service) bool {
return service.Spec.Type == api.ServiceTypeLoadBalancer
func wantsLoadBalancer(service *v1.Service) bool {
return service.Spec.Type == v1.ServiceTypeLoadBalancer
}
func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}
@@ -736,7 +736,7 @@ func (s *ServiceController) syncService(key string) error {
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
} else {
service, ok := obj.(*api.Service)
service, ok := obj.(*v1.Service)
if ok {
cachedService = s.cache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
@@ -778,14 +778,14 @@ func (s *ServiceController) processServiceDeletion(key string) (error, time.Dura
if !wantsLoadBalancer(service) {
return nil, doNotRetry
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
s.eventRecorder.Event(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", message)
return err, cachedService.nextRetryDelay()
}
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(key)
cachedService.resetRetryDelay()