diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index d816bb0a758..ca4530be06a 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -33,6 +33,7 @@ go_library( "//pkg/kubeapiserver/server:go_default_library", "//pkg/master:go_default_library", "//pkg/master/controller/crdregistration:go_default_library", + "//pkg/master/reconcilers:go_default_library", "//pkg/master/tunneler:go_default_library", "//pkg/quota/install:go_default_library", "//pkg/registry/cachesize:go_default_library", diff --git a/cmd/kube-apiserver/app/options/BUILD b/cmd/kube-apiserver/app/options/BUILD index f65e5b213d4..c584bb6729b 100644 --- a/cmd/kube-apiserver/app/options/BUILD +++ b/cmd/kube-apiserver/app/options/BUILD @@ -21,6 +21,7 @@ go_library( "//pkg/kubeapiserver/options:go_default_library", "//pkg/kubelet/client:go_default_library", "//pkg/master/ports:go_default_library", + "//pkg/master/reconcilers:go_default_library", "//plugin/pkg/admission/admit:go_default_library", "//plugin/pkg/admission/alwayspullimages:go_default_library", "//plugin/pkg/admission/antiaffinity:go_default_library", @@ -64,6 +65,7 @@ go_test( "//pkg/api:go_default_library", "//pkg/kubeapiserver/options:go_default_library", "//pkg/kubelet/client:go_default_library", + "//pkg/master/reconcilers:go_default_library", "//vendor/github.com/spf13/pflag:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/options:go_default_library", diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index 31cc0529434..1c2f4716a74 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -19,6 +19,7 @@ package options import ( "net" + "strings" "time" utilnet "k8s.io/apimachinery/pkg/util/net" @@ -29,6 +30,7 @@ import ( kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master/ports" + "k8s.io/kubernetes/pkg/master/reconcilers" // add the kubernetes feature gates _ "k8s.io/kubernetes/pkg/features" @@ -59,7 +61,6 @@ type ServerRunOptions struct { EventTTL time.Duration KubeletConfig kubeletclient.KubeletClientConfig KubernetesServiceNodePort int - MasterCount int MaxConnectionBytesPerSec int64 ServiceClusterIPRange net.IPNet // TODO: make this a list ServiceNodePortRange utilnet.PortRange @@ -70,6 +71,9 @@ type ServerRunOptions struct { ProxyClientKeyFile string EnableAggregatorRouting bool + + MasterCount int + EndpointReconcilerType string } // NewServerRunOptions creates a new ServerRunOptions object with default parameters @@ -88,9 +92,10 @@ func NewServerRunOptions() *ServerRunOptions { StorageSerialization: kubeoptions.NewStorageSerializationOptions(), APIEnablement: kubeoptions.NewAPIEnablementOptions(), - EnableLogsHandler: true, - EventTTL: 1 * time.Hour, - MasterCount: 1, + EnableLogsHandler: true, + EventTTL: 1 * time.Hour, + MasterCount: 1, + EndpointReconcilerType: string(reconcilers.MasterCountReconcilerType), KubeletConfig: kubeletclient.KubeletClientConfig{ Port: ports.KubeletPort, ReadOnlyPort: ports.KubeletReadOnlyPort, @@ -164,6 +169,9 @@ func (s *ServerRunOptions) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.MasterCount, "apiserver-count", s.MasterCount, "The number of apiservers running in the cluster, must be a positive number.") + fs.StringVar(&s.EndpointReconcilerType, "alpha-endpoint-reconciler-type", string(s.EndpointReconcilerType), + "Use an endpoint reconciler ("+strings.Join(reconcilers.AllTypes.Names(), ", ")+")") + // See #14282 for details on how to test/try this option out. // TODO: remove this comment once this option is tested in CI. fs.IntVar(&s.KubernetesServiceNodePort, "kubernetes-service-node-port", s.KubernetesServiceNodePort, ""+ diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 93a5d52ed6d..8930f505fd8 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -32,6 +32,7 @@ import ( kapi "k8s.io/kubernetes/pkg/api" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" + "k8s.io/kubernetes/pkg/master/reconcilers" ) func TestAddFlags(t *testing.T) { @@ -44,6 +45,7 @@ func TestAddFlags(t *testing.T) { "--admission-control-config-file=/admission-control-config", "--advertise-address=192.168.10.10", "--allow-privileged=false", + "--alpha-endpoint-reconciler-type=" + string(reconcilers.MasterCountReconcilerType), "--anonymous-auth=false", "--apiserver-count=5", "--audit-log-maxage=11", @@ -88,9 +90,10 @@ func TestAddFlags(t *testing.T) { // This is a snapshot of expected options parsed by args. expected := &ServerRunOptions{ - ServiceNodePortRange: DefaultServiceNodePortRange, - MasterCount: 5, - AllowPrivileged: false, + ServiceNodePortRange: DefaultServiceNodePortRange, + MasterCount: 5, + EndpointReconcilerType: string(reconcilers.MasterCountReconcilerType), + AllowPrivileged: false, GenericServerRunOptions: &apiserveroptions.ServerRunOptions{ AdvertiseAddress: net.ParseIP("192.168.10.10"), CorsAllowedOriginList: []string{"10.10.10.100", "10.10.10.200"}, diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index 143f8cdae5e..99d7993b972 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -77,6 +77,7 @@ import ( kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeserver "k8s.io/kubernetes/pkg/kubeapiserver/server" "k8s.io/kubernetes/pkg/master" + "k8s.io/kubernetes/pkg/master/reconcilers" "k8s.io/kubernetes/pkg/master/tunneler" quotainstall "k8s.io/kubernetes/pkg/quota/install" "k8s.io/kubernetes/pkg/registry/cachesize" @@ -340,7 +341,8 @@ func CreateKubeAPIServerConfig(s *options.ServerRunOptions, nodeTunneler tunnele ServiceNodePortRange: s.ServiceNodePortRange, KubernetesServiceNodePort: s.KubernetesServiceNodePort, - MasterCount: s.MasterCount, + EndpointReconcilerType: reconcilers.Type(s.EndpointReconcilerType), + MasterCount: s.MasterCount, }, } diff --git a/pkg/master/BUILD b/pkg/master/BUILD index 8288c334650..d9e1d598132 100644 --- a/pkg/master/BUILD +++ b/pkg/master/BUILD @@ -19,7 +19,6 @@ go_library( deps = [ "//cmd/kube-apiserver/app/options:go_default_library", "//pkg/api:go_default_library", - "//pkg/api/endpoints:go_default_library", "//pkg/api/install:go_default_library", "//pkg/apis/admission/install:go_default_library", "//pkg/apis/admissionregistration/install:go_default_library", @@ -40,6 +39,7 @@ go_library( "//pkg/apis/storage/install:go_default_library", "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/kubelet/client:go_default_library", + "//pkg/master/reconcilers:go_default_library", "//pkg/master/tunneler:go_default_library", "//pkg/registry/admissionregistration/rest:go_default_library", "//pkg/registry/apps/rest:go_default_library", @@ -48,6 +48,8 @@ go_library( "//pkg/registry/autoscaling/rest:go_default_library", "//pkg/registry/batch/rest:go_default_library", "//pkg/registry/certificates/rest:go_default_library", + "//pkg/registry/core/endpoint:go_default_library", + "//pkg/registry/core/endpoint/storage:go_default_library", "//pkg/registry/core/rangeallocation:go_default_library", "//pkg/registry/core/rest:go_default_library", "//pkg/registry/core/service/ipallocator:go_default_library", @@ -95,6 +97,7 @@ go_library( "//vendor/k8s.io/apiserver/pkg/server:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", ], @@ -122,6 +125,7 @@ go_test( "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/generated/openapi:go_default_library", "//pkg/kubelet/client:go_default_library", + "//pkg/master/reconcilers:go_default_library", "//pkg/registry/certificates/rest:go_default_library", "//pkg/registry/core/rest:go_default_library", "//pkg/registry/registrytest:go_default_library", @@ -172,6 +176,7 @@ filegroup( ":package-srcs", "//pkg/master/controller/crdregistration:all-srcs", "//pkg/master/ports:all-srcs", + "//pkg/master/reconcilers:all-srcs", "//pkg/master/tunneler:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/master/controller.go b/pkg/master/controller.go index feb347237a9..158b5d1f6f0 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -30,8 +30,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/endpoints" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + "k8s.io/kubernetes/pkg/master/reconcilers" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" corerest "k8s.io/kubernetes/pkg/registry/core/rest" servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller" @@ -57,7 +57,7 @@ type Controller struct { ServiceNodePortInterval time.Duration ServiceNodePortRange utilnet.PortRange - EndpointReconciler EndpointReconciler + EndpointReconciler reconcilers.EndpointReconciler EndpointInterval time.Duration SystemNamespaces []string @@ -242,7 +242,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}); err == nil { // The service already exists. if reconcile { - if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { + if svc, updated := reconcilers.GetMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { glog.Warningf("Resetting master service %q to %#v", serviceName, svc) _, err := c.ServiceClient.Services(metav1.NamespaceDefault).Update(svc) return err @@ -272,195 +272,3 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser } return err } - -// EndpointReconciler knows how to reconcile the endpoints for the apiserver service. -type EndpointReconciler interface { - // ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). - // ReconcileEndpoints expects that the endpoints objects it manages will all be - // managed only by ReconcileEndpoints; therefore, to understand this, you need only - // understand the requirements. - // - // Requirements: - // * All apiservers MUST use the same ports for their {rw, ro} services. - // * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the - // endpoints for their {rw, ro} services. - // * ReconcileEndpoints is called periodically from all apiservers. - ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error -} - -// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of -// masters. masterCountEndpointReconciler implements EndpointReconciler. -type masterCountEndpointReconciler struct { - masterCount int - endpointClient coreclient.EndpointsGetter -} - -var _ EndpointReconciler = &masterCountEndpointReconciler{} - -// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a -// specified expected number of masters. -func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient.EndpointsGetter) *masterCountEndpointReconciler { - return &masterCountEndpointReconciler{ - masterCount: masterCount, - endpointClient: endpointClient, - } -} - -// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). -// ReconcileEndpoints expects that the endpoints objects it manages will all be -// managed only by ReconcileEndpoints; therefore, to understand this, you need only -// understand the requirements and the body of this function. -// -// Requirements: -// * All apiservers MUST use the same ports for their {rw, ro} services. -// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the -// endpoints for their {rw, ro} services. -// * All apiservers MUST know and agree on the number of apiservers expected -// to be running (c.masterCount). -// * ReconcileEndpoints is called periodically from all apiservers. -func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { - e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) - if err != nil { - e = &api.Endpoints{ - ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, - Namespace: metav1.NamespaceDefault, - }, - } - } - if errors.IsNotFound(err) { - // Simply create non-existing endpoints for the service. - e.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: ip.String()}}, - Ports: endpointPorts, - }} - _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e) - return err - } - - // First, determine if the endpoint is in the format we expect (one - // subset, ports matching endpointPorts, N IP addresses). - formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts) - if !formatCorrect { - // Something is egregiously wrong, just re-make the endpoints record. - e.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: ip.String()}}, - Ports: endpointPorts, - }} - glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e) - _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) - return err - } - if ipCorrect && portsCorrect { - return nil - } - if !ipCorrect { - // We *always* add our own IP address. - e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()}) - - // Lexicographic order is retained by this step. - e.Subsets = endpoints.RepackSubsets(e.Subsets) - - // If too many IP addresses, remove the ones lexicographically after our - // own IP address. Given the requirements stated at the top of - // this function, this should cause the list of IP addresses to - // become eventually correct. - if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount { - // addrs is a pointer because we're going to mutate it. - for i, addr := range *addrs { - if addr.IP == ip.String() { - for len(*addrs) > r.masterCount { - // wrap around if necessary. - remove := (i + 1) % len(*addrs) - *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...) - } - break - } - } - } - } - if !portsCorrect { - // Reset ports. - e.Subsets[0].Ports = endpointPorts - } - glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) - _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) - return err -} - -// Determine if the endpoint is in the format ReconcileEndpoints expects. -// -// Return values: -// * formatCorrect is true if exactly one subset is found. -// * ipCorrect is true when current master's IP is found and the number -// of addresses is less than or equal to the master count. -// * portsCorrect is true when endpoint ports exactly match provided ports. -// portsCorrect is only evaluated when reconcilePorts is set to true. -func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) { - if len(e.Subsets) != 1 { - return false, false, false - } - sub := &e.Subsets[0] - portsCorrect = true - if reconcilePorts { - if len(sub.Ports) != len(ports) { - portsCorrect = false - } - for i, port := range ports { - if len(sub.Ports) <= i || port != sub.Ports[i] { - portsCorrect = false - break - } - } - } - for _, addr := range sub.Addresses { - if addr.IP == ip { - ipCorrect = len(sub.Addresses) <= count - break - } - } - return true, ipCorrect, portsCorrect -} - -// * getMasterServiceUpdateIfNeeded sets service attributes for the -// given apiserver service. -// * getMasterServiceUpdateIfNeeded expects that the service object it -// manages will be managed only by getMasterServiceUpdateIfNeeded; -// therefore, to understand this, you need only understand the -// requirements and the body of this function. -// * getMasterServiceUpdateIfNeeded ensures that the correct ports are -// are set. -// -// Requirements: -// * All apiservers MUST use getMasterServiceUpdateIfNeeded and only -// getMasterServiceUpdateIfNeeded to manage service attributes -// * updateMasterService is called periodically from all apiservers. -func getMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.ServicePort, serviceType api.ServiceType) (s *api.Service, updated bool) { - // Determine if the service is in the format we expect - // (servicePorts are present and service type matches) - formatCorrect := checkServiceFormat(svc, servicePorts, serviceType) - if formatCorrect { - return svc, false - } - svc.Spec.Ports = servicePorts - svc.Spec.Type = serviceType - return svc, true -} - -// Determine if the service is in the correct format -// getMasterServiceUpdateIfNeeded expects (servicePorts are correct -// and service type matches). -func checkServiceFormat(s *api.Service, ports []api.ServicePort, serviceType api.ServiceType) (formatCorrect bool) { - if s.Spec.Type != serviceType { - return false - } - if len(ports) != len(s.Spec.Ports) { - return false - } - for i, port := range ports { - if port != s.Spec.Ports[i] { - return false - } - } - return true -} diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index 50dc1bab734..439f6f698b4 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -26,6 +26,7 @@ import ( core "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/master/reconcilers" ) func TestReconcileEndpoints(t *testing.T) { @@ -377,7 +378,7 @@ func TestReconcileEndpoints(t *testing.T) { if test.endpoints != nil { fakeClient = fake.NewSimpleClientset(test.endpoints) } - reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core()) + reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core()) err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) @@ -495,7 +496,7 @@ func TestReconcileEndpoints(t *testing.T) { if test.endpoints != nil { fakeClient = fake.NewSimpleClientset(test.endpoints) } - reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core()) + reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core()) err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) diff --git a/pkg/master/master.go b/pkg/master/master.go index f9578947f3a..2b5a2aa3a3a 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -50,12 +50,17 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" serverstorage "k8s.io/apiserver/pkg/server/storage" + storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/api" + kapi "k8s.io/kubernetes/pkg/api" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" + "k8s.io/kubernetes/pkg/master/reconcilers" "k8s.io/kubernetes/pkg/master/tunneler" + "k8s.io/kubernetes/pkg/registry/core/endpoint" + endpointsstorage "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" "k8s.io/kubernetes/pkg/routes" nodeutil "k8s.io/kubernetes/pkg/util/node" @@ -85,6 +90,10 @@ const ( // DefaultEndpointReconcilerInterval is the default amount of time for how often the endpoints for // the kubernetes Service are reconciled. DefaultEndpointReconcilerInterval = 10 * time.Second + // DefaultEndpointReconcilerTTL is the default TTL timeout for the storage layer + DefaultEndpointReconcilerTTL = 15 * time.Second + // DefaultStorageEndpoint is the default storage endpoint for the lease controller + DefaultStorageEndpoint = "kube-apiserver-endpoint" ) type ExtraConfig struct { @@ -133,6 +142,19 @@ type ExtraConfig struct { // Number of masters running; all masters must be started with the // same value for this field. (Numbers > 1 currently untested.) MasterCount int + + // MasterEndpointReconcileTTL sets the time to live in seconds of an + // endpoint record recorded by each master. The endpoints are checked at an + // interval that is 2/3 of this value and this value defaults to 15s if + // unset. In very large clusters, this value may be increased to reduce the + // possibility that the master endpoint record expires (due to other load + // on the etcd server) and causes masters to drop in and out of the + // kubernetes service record. It is not recommended to set this value below + // 15s. + MasterEndpointReconcileTTL time.Duration + + // Selects which reconciler to use + EndpointReconcilerType reconcilers.Type } type Config struct { @@ -153,7 +175,7 @@ type CompletedConfig struct { // EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be // used by the master. type EndpointReconcilerConfig struct { - Reconciler EndpointReconciler + Reconciler reconcilers.EndpointReconciler Interval time.Duration } @@ -164,6 +186,56 @@ type Master struct { ClientCARegistrationHook ClientCARegistrationHook } +func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler { + endpointClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointClient) +} + +func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler { + return reconcilers.NewNoneEndpointReconciler() +} + +func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { + ttl := c.ExtraConfig.MasterEndpointReconcileTTL + config, err := c.ExtraConfig.StorageFactory.NewConfig(kapi.Resource("apiServerIPInfo")) + if err != nil { + glog.Fatalf("Error determining service IP ranges: %v", err) + } + leaseStorage, _, err := storagefactory.Create(*config) + if err != nil { + glog.Fatalf("Error creating storage factory: %v", err) + } + endpointConfig, err := c.ExtraConfig.StorageFactory.NewConfig(kapi.Resource(DefaultStorageEndpoint)) + if err != nil { + glog.Fatalf("Error getting storage config: %v", err) + } + endpointsStorage := endpointsstorage.NewREST(generic.RESTOptions{ + StorageConfig: endpointConfig, + Decorator: generic.UndecoratedStorage, + DeleteCollectionWorkers: 0, + ResourcePrefix: c.ExtraConfig.StorageFactory.ResourcePrefix(kapi.Resource(DefaultStorageEndpoint)), + }) + endpointRegistry := endpoint.NewRegistry(endpointsStorage) + masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl) + return reconcilers.NewLeaseEndpointReconciler(endpointRegistry, masterLeases) +} + +func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler { + glog.Infof("Using reconciler: %v", c.ExtraConfig.EndpointReconcilerType) + switch c.ExtraConfig.EndpointReconcilerType { + // there are numerous test dependencies that depend on a default controller + case "", reconcilers.MasterCountReconcilerType: + return c.createMasterCountReconciler() + case reconcilers.LeaseEndpointReconcilerType: + return c.createLeaseReconciler() + case reconcilers.NoneEndpointReconcilerType: + return c.createNoneReconciler() + default: + glog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType) + } + return nil +} + // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. func (cfg *Config) Complete(informers informers.SharedInformerFactory) CompletedConfig { c := completedConfig{ @@ -203,10 +275,12 @@ func (cfg *Config) Complete(informers informers.SharedInformerFactory) Completed c.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval } + if c.ExtraConfig.MasterEndpointReconcileTTL == 0 { + c.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL + } + if c.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil { - // use a default endpoint reconciler if nothing is set - endpointClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) - c.ExtraConfig.EndpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointClient) + c.ExtraConfig.EndpointReconcilerConfig.Reconciler = cfg.createEndpointReconciler() } // this has always been hardcoded true in the past diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index e7721e0c3cb..2e971c39275 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -58,6 +58,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/rbac" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" + "k8s.io/kubernetes/pkg/master/reconcilers" certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest" corerest "k8s.io/kubernetes/pkg/registry/core/rest" "k8s.io/kubernetes/pkg/registry/registrytest" @@ -76,6 +77,7 @@ func setUp(t *testing.T) (*etcdtesting.EtcdTestServer, Config, informers.SharedI APIResourceConfigSource: DefaultAPIResourceConfigSource(), APIServerServicePort: 443, MasterCount: 1, + EndpointReconcilerType: reconcilers.MasterCountReconcilerType, }, } diff --git a/pkg/master/reconcilers/BUILD b/pkg/master/reconcilers/BUILD new file mode 100644 index 00000000000..5030537eb72 --- /dev/null +++ b/pkg/master/reconcilers/BUILD @@ -0,0 +1,50 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "doc.go", + "lease.go", + "mastercount.go", + "none.go", + "reconcilers.go", + ], + visibility = ["//visibility:public"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/endpoints:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", + "//pkg/registry/core/endpoint:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//vendor/k8s.io/apiserver/pkg/storage:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["lease_test.go"], + library = ":go_default_library", + deps = [ + "//pkg/api:go_default_library", + "//pkg/registry/registrytest:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/master/reconcilers/doc.go b/pkg/master/reconcilers/doc.go new file mode 100644 index 00000000000..d397aab778e --- /dev/null +++ b/pkg/master/reconcilers/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package reconcilers provides objects for managing the list of active masters. +// NOTE: The Lease reconciler is not the intended way for any apiserver other +// than kube-apiserver to accomplish the task of Endpoint registration. This is +// a special case for the time being. +package reconcilers diff --git a/pkg/master/reconcilers/lease.go b/pkg/master/reconcilers/lease.go new file mode 100644 index 00000000000..75e8d64c4a0 --- /dev/null +++ b/pkg/master/reconcilers/lease.go @@ -0,0 +1,251 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconcilers + +/* +Original Source: +https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler.go +*/ + +import ( + "fmt" + "net" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kruntime "k8s.io/apimachinery/pkg/runtime" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/storage" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/endpoints" + "k8s.io/kubernetes/pkg/registry/core/endpoint" +) + +// Leases is an interface which assists in managing the set of active masters +type Leases interface { + // ListLeases retrieves a list of the current master IPs + ListLeases() ([]string, error) + + // UpdateLease adds or refreshes a master's lease + UpdateLease(ip string) error +} + +type storageLeases struct { + storage storage.Interface + baseKey string + leaseTime time.Duration +} + +var _ Leases = &storageLeases{} + +// ListLeases retrieves a list of the current master IPs from storage +func (s *storageLeases) ListLeases() ([]string, error) { + ipInfoList := &api.EndpointsList{} + if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil { + return nil, err + } + + ipList := make([]string, len(ipInfoList.Items)) + for i, ip := range ipInfoList.Items { + ipList[i] = ip.Subsets[0].Addresses[0].IP + } + + glog.V(6).Infof("Current master IPs listed in storage are %v", ipList) + + return ipList, nil +} + +// UpdateLease resets the TTL on a master IP in storage +func (s *storageLeases) UpdateLease(ip string) error { + return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) { + // just make sure we've got the right IP set, and then refresh the TTL + existing := input.(*api.Endpoints) + existing.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{IP: ip}}, + }, + } + + leaseTime := uint64(s.leaseTime) + + // NB: GuaranteedUpdate does not perform the store operation unless + // something changed between load and store (not including resource + // version), meaning we can't refresh the TTL without actually + // changing a field. + existing.Generation++ + + glog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime) + + return existing, &leaseTime, nil + }) +} + +// NewLeases creates a new etcd-based Leases implementation. +func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duration) Leases { + return &storageLeases{ + storage: storage, + baseKey: baseKey, + leaseTime: leaseTime, + } +} + +type leaseEndpointReconciler struct { + endpointRegistry endpoint.Registry + masterLeases Leases +} + +// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler +func NewLeaseEndpointReconciler(endpointRegistry endpoint.Registry, masterLeases Leases) EndpointReconciler { + return &leaseEndpointReconciler{ + endpointRegistry: endpointRegistry, + masterLeases: masterLeases, + } +} + +// ReconcileEndpoints lists keys in a special etcd directory. +// Each key is expected to have a TTL of R+n, where R is the refresh interval +// at which this function is called, and n is some small value. If an +// apiserver goes down, it will fail to refresh its key's TTL and the key will +// expire. ReconcileEndpoints will notice that the endpoints object is +// different from the directory listing, and update the endpoints object +// accordingly. +func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { + ctx := apirequest.NewDefaultContext() + + // Refresh the TTL on our key, independently of whether any error or + // update conflict happens below. This makes sure that at least some of + // the masters will add our endpoint. + if err := r.masterLeases.UpdateLease(ip.String()); err != nil { + return err + } + + // Retrieve the current list of endpoints... + e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName, &metav1.GetOptions{}) + if err != nil { + if !errors.IsNotFound(err) { + return err + } + + e = &api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: api.NamespaceDefault, + }, + } + } + + // ... and the list of master IP keys from etcd + masterIPs, err := r.masterLeases.ListLeases() + if err != nil { + return err + } + + // Since we just refreshed our own key, assume that zero endpoints + // returned from storage indicates an issue or invalid state, and thus do + // not update the endpoints list based on the result. + if len(masterIPs) == 0 { + return fmt.Errorf("no master IPs were listed in storage, refusing to erase all endpoints for the kubernetes service") + } + + // Next, we compare the current list of endpoints with the list of master IP keys + formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts) + if formatCorrect && ipCorrect && portsCorrect { + return nil + } + + if !formatCorrect { + // Something is egregiously wrong, just re-make the endpoints record. + e.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{}, + Ports: endpointPorts, + }} + } + + if !formatCorrect || !ipCorrect { + // repopulate the addresses according to the expected IPs from etcd + e.Subsets[0].Addresses = make([]api.EndpointAddress, len(masterIPs)) + for ind, ip := range masterIPs { + e.Subsets[0].Addresses[ind] = api.EndpointAddress{IP: ip} + } + + // Lexicographic order is retained by this step. + e.Subsets = endpoints.RepackSubsets(e.Subsets) + } + + if !portsCorrect { + // Reset ports. + e.Subsets[0].Ports = endpointPorts + } + + glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs) + return r.endpointRegistry.UpdateEndpoints(ctx, e) +} + +// checkEndpointSubsetFormatWithLease determines if the endpoint is in the +// format ReconcileEndpoints expects when the controller is using leases. +// +// Return values: +// * formatCorrect is true if exactly one subset is found. +// * ipsCorrect when the addresses in the endpoints match the expected addresses list +// * portsCorrect is true when endpoint ports exactly match provided ports. +// portsCorrect is only evaluated when reconcilePorts is set to true. +func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string, ports []api.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) { + if len(e.Subsets) != 1 { + return false, false, false + } + sub := &e.Subsets[0] + portsCorrect = true + if reconcilePorts { + if len(sub.Ports) != len(ports) { + portsCorrect = false + } else { + for i, port := range ports { + if port != sub.Ports[i] { + portsCorrect = false + break + } + } + } + } + + ipsCorrect = true + if len(sub.Addresses) != len(expectedIPs) { + ipsCorrect = false + } else { + // check the actual content of the addresses + // present addrs is used as a set (the keys) and to indicate if a + // value was already found (the values) + presentAddrs := make(map[string]bool, len(expectedIPs)) + for _, ip := range expectedIPs { + presentAddrs[ip] = false + } + + // uniqueness is assumed amongst all Addresses. + for _, addr := range sub.Addresses { + if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok { + ipsCorrect = false + break + } + + presentAddrs[addr.IP] = true + } + } + + return true, ipsCorrect, portsCorrect +} diff --git a/pkg/master/reconcilers/lease_test.go b/pkg/master/reconcilers/lease_test.go new file mode 100644 index 00000000000..5f2c66ee1e4 --- /dev/null +++ b/pkg/master/reconcilers/lease_test.go @@ -0,0 +1,531 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconcilers + +/* +Original Source: +https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler_test.go +*/ + +import ( + "net" + "reflect" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/registry/registrytest" +) + +type fakeLeases struct { + keys map[string]bool +} + +var _ Leases = &fakeLeases{} + +func newFakeLeases() *fakeLeases { + return &fakeLeases{make(map[string]bool)} +} + +func (f *fakeLeases) ListLeases() ([]string, error) { + res := make([]string, 0, len(f.keys)) + for ip := range f.keys { + res = append(res, ip) + } + return res, nil +} + +func (f *fakeLeases) UpdateLease(ip string) error { + f.keys[ip] = true + return nil +} + +func (f *fakeLeases) SetKeys(keys []string) { + for _, ip := range keys { + f.keys[ip] = false + } +} + +func (f *fakeLeases) GetUpdatedKeys() []string { + res := []string{} + for ip, updated := range f.keys { + if updated { + res = append(res, ip) + } + } + return res +} + +func TestLeaseEndpointReconciler(t *testing.T) { + ns := api.NamespaceDefault + om := func(name string) metav1.ObjectMeta { + return metav1.ObjectMeta{Namespace: ns, Name: name} + } + reconcileTests := []struct { + testName string + serviceName string + ip string + endpointPorts []api.EndpointPort + endpointKeys []string + endpoints *api.EndpointsList + expectUpdate *api.Endpoints // nil means none expected + }{ + { + testName: "no existing endpoints", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: nil, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints satisfy", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + }, + { + testName: "existing endpoints satisfy + refresh existing key", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + }, + { + testName: "existing endpoints satisfy but too many", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints satisfy but too many + extra masters", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.1"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints satisfy but too many + extra masters + delete first", + serviceName: "foo", + ip: "4.3.2.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "4.3.2.1"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "4.3.2.1"}, + {IP: "4.3.2.2"}, + {IP: "4.3.2.3"}, + {IP: "4.3.2.4"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints current IP missing", + serviceName: "foo", + ip: "4.3.2.2", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointKeys: []string{"4.3.2.1"}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "4.3.2.1"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "4.3.2.1"}, + {IP: "4.3.2.2"}, + }, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints wrong name", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("bar"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints wrong IP", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints wrong port", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints wrong protocol", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints wrong port name", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints extra service ports satisfy", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + {Name: "baz", Port: 1010, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + {Name: "baz", Port: 1010, Protocol: "TCP"}, + }, + }}, + }}, + }, + }, + { + testName: "existing endpoints extra service ports missing port", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, + }}, + }, + }, + } + for _, test := range reconcileTests { + fakeLeases := newFakeLeases() + fakeLeases.SetKeys(test.endpointKeys) + registry := ®istrytest.EndpointRegistry{ + Endpoints: test.endpoints, + } + r := NewLeaseEndpointReconciler(registry, fakeLeases) + err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + if test.expectUpdate != nil { + if len(registry.Updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectUpdate == nil && len(registry.Updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + } + if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { + t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys) + } + } + + nonReconcileTests := []struct { + testName string + serviceName string + ip string + endpointPorts []api.EndpointPort + endpointKeys []string + endpoints *api.EndpointsList + expectUpdate *api.Endpoints // nil means none expected + }{ + { + testName: "existing endpoints extra service ports missing port no update", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: nil, + }, + { + testName: "existing endpoints extra service ports, wrong ports, wrong IP", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "no existing endpoints", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: nil, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + } + for _, test := range nonReconcileTests { + fakeLeases := newFakeLeases() + fakeLeases.SetKeys(test.endpointKeys) + registry := ®istrytest.EndpointRegistry{ + Endpoints: test.endpoints, + } + r := NewLeaseEndpointReconciler(registry, fakeLeases) + err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + if test.expectUpdate != nil { + if len(registry.Updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) + } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectUpdate == nil && len(registry.Updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + } + if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { + t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys) + } + } +} diff --git a/pkg/master/reconcilers/mastercount.go b/pkg/master/reconcilers/mastercount.go new file mode 100644 index 00000000000..5984fec2d60 --- /dev/null +++ b/pkg/master/reconcilers/mastercount.go @@ -0,0 +1,204 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package reconcilers master count based reconciler +package reconcilers + +import ( + "net" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/endpoints" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" +) + +// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of +// masters. masterCountEndpointReconciler implements EndpointReconciler. +type masterCountEndpointReconciler struct { + masterCount int + endpointClient coreclient.EndpointsGetter +} + +// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a +// specified expected number of masters. +func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient.EndpointsGetter) EndpointReconciler { + return &masterCountEndpointReconciler{ + masterCount: masterCount, + endpointClient: endpointClient, + } +} + +// ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). +// ReconcileEndpoints expects that the endpoints objects it manages will all be +// managed only by ReconcileEndpoints; therefore, to understand this, you need only +// understand the requirements and the body of this function. +// +// Requirements: +// * All apiservers MUST use the same ports for their {rw, ro} services. +// * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the +// endpoints for their {rw, ro} services. +// * All apiservers MUST know and agree on the number of apiservers expected +// to be running (c.masterCount). +// * ReconcileEndpoints is called periodically from all apiservers. +func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { + e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) + if err != nil { + e = &api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: serviceName, + Namespace: metav1.NamespaceDefault, + }, + } + } + if errors.IsNotFound(err) { + // Simply create non-existing endpoints for the service. + e.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: ip.String()}}, + Ports: endpointPorts, + }} + _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e) + return err + } + + // First, determine if the endpoint is in the format we expect (one + // subset, ports matching endpointPorts, N IP addresses). + formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts) + if !formatCorrect { + // Something is egregiously wrong, just re-make the endpoints record. + e.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: ip.String()}}, + Ports: endpointPorts, + }} + glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e) + _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) + return err + } + if ipCorrect && portsCorrect { + return nil + } + if !ipCorrect { + // We *always* add our own IP address. + e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()}) + + // Lexicographic order is retained by this step. + e.Subsets = endpoints.RepackSubsets(e.Subsets) + + // If too many IP addresses, remove the ones lexicographically after our + // own IP address. Given the requirements stated at the top of + // this function, this should cause the list of IP addresses to + // become eventually correct. + if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount { + // addrs is a pointer because we're going to mutate it. + for i, addr := range *addrs { + if addr.IP == ip.String() { + for len(*addrs) > r.masterCount { + // wrap around if necessary. + remove := (i + 1) % len(*addrs) + *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...) + } + break + } + } + } + } + if !portsCorrect { + // Reset ports. + e.Subsets[0].Ports = endpointPorts + } + glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) + _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) + return err +} + +// Determine if the endpoint is in the format ReconcileEndpoints expects. +// +// Return values: +// * formatCorrect is true if exactly one subset is found. +// * ipCorrect is true when current master's IP is found and the number +// of addresses is less than or equal to the master count. +// * portsCorrect is true when endpoint ports exactly match provided ports. +// portsCorrect is only evaluated when reconcilePorts is set to true. +func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) { + if len(e.Subsets) != 1 { + return false, false, false + } + sub := &e.Subsets[0] + portsCorrect = true + if reconcilePorts { + if len(sub.Ports) != len(ports) { + portsCorrect = false + } + for i, port := range ports { + if len(sub.Ports) <= i || port != sub.Ports[i] { + portsCorrect = false + break + } + } + } + for _, addr := range sub.Addresses { + if addr.IP == ip { + ipCorrect = len(sub.Addresses) <= count + break + } + } + return true, ipCorrect, portsCorrect +} + +// GetMasterServiceUpdateIfNeeded sets service attributes for the +// given apiserver service. +// * GetMasterServiceUpdateIfNeeded expects that the service object it +// manages will be managed only by GetMasterServiceUpdateIfNeeded; +// therefore, to understand this, you need only understand the +// requirements and the body of this function. +// * GetMasterServiceUpdateIfNeeded ensures that the correct ports are +// are set. +// +// Requirements: +// * All apiservers MUST use GetMasterServiceUpdateIfNeeded and only +// GetMasterServiceUpdateIfNeeded to manage service attributes +// * updateMasterService is called periodically from all apiservers. +func GetMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.ServicePort, serviceType api.ServiceType) (s *api.Service, updated bool) { + // Determine if the service is in the format we expect + // (servicePorts are present and service type matches) + formatCorrect := checkServiceFormat(svc, servicePorts, serviceType) + if formatCorrect { + return svc, false + } + svc.Spec.Ports = servicePorts + svc.Spec.Type = serviceType + return svc, true +} + +// Determine if the service is in the correct format +// GetMasterServiceUpdateIfNeeded expects (servicePorts are correct +// and service type matches). +func checkServiceFormat(s *api.Service, ports []api.ServicePort, serviceType api.ServiceType) (formatCorrect bool) { + if s.Spec.Type != serviceType { + return false + } + if len(ports) != len(s.Spec.Ports) { + return false + } + for i, port := range ports { + if port != s.Spec.Ports[i] { + return false + } + } + return true +} diff --git a/pkg/master/reconcilers/none.go b/pkg/master/reconcilers/none.go new file mode 100644 index 00000000000..33e65ab7db0 --- /dev/null +++ b/pkg/master/reconcilers/none.go @@ -0,0 +1,38 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package reconcilers a noop based reconciler +package reconcilers + +import ( + "net" + + "k8s.io/kubernetes/pkg/api" +) + +// NoneEndpointReconciler allows for the endpoint reconciler to be disabled +type noneEndpointReconciler struct{} + +// NewNoneEndpointReconciler creates a new EndpointReconciler that reconciles based on a +// nothing. It is a no-op. +func NewNoneEndpointReconciler() EndpointReconciler { + return &noneEndpointReconciler{} +} + +// ReconcileEndpoints noop reconcile +func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { + return nil +} diff --git a/pkg/master/reconcilers/reconcilers.go b/pkg/master/reconcilers/reconcilers.go new file mode 100644 index 00000000000..346fa8fb529 --- /dev/null +++ b/pkg/master/reconcilers/reconcilers.go @@ -0,0 +1,70 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package reconcilers Endpoint Reconcilers for the apiserver +package reconcilers + +import ( + "net" + + "k8s.io/kubernetes/pkg/api" +) + +// EndpointReconciler knows how to reconcile the endpoints for the apiserver service. +type EndpointReconciler interface { + // ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). + // ReconcileEndpoints expects that the endpoints objects it manages will all be + // managed only by ReconcileEndpoints; therefore, to understand this, you need only + // understand the requirements. + // + // Requirements: + // * All apiservers MUST use the same ports for their {rw, ro} services. + // * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the + // endpoints for their {rw, ro} services. + // * ReconcileEndpoints is called periodically from all apiservers. + ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error +} + +// Type the reconciler type +type Type string + +const ( + // MasterCountReconcilerType will select the original reconciler + MasterCountReconcilerType Type = "master-count" + // LeaseEndpointReconcilerType will select a storage based reconciler + LeaseEndpointReconcilerType = "lease" + // NoneEndpointReconcilerType will turn off the endpoint reconciler + NoneEndpointReconcilerType = "none" +) + +// Types an array of reconciler types +type Types []Type + +// AllTypes export all reconcilers +var AllTypes = Types{ + MasterCountReconcilerType, + LeaseEndpointReconcilerType, + NoneEndpointReconcilerType, +} + +// Names returns a slice of all the reconciler names +func (t Types) Names() []string { + strs := make([]string, len(t)) + for i, v := range t { + strs[i] = string(v) + } + return strs +}