diff --git a/pkg/master/BUILD b/pkg/master/BUILD index 30d7eb6fd43..0ba09673109 100644 --- a/pkg/master/BUILD +++ b/pkg/master/BUILD @@ -115,6 +115,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 3fdfe1a0a23..3e90b84f692 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -19,6 +19,7 @@ package master import ( "fmt" "net" + "net/http" "time" corev1 "k8s.io/api/core/v1" @@ -31,6 +32,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" utilfeature "k8s.io/apiserver/pkg/util/feature" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/klog" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/master/reconcilers" @@ -51,6 +53,7 @@ type Controller struct { ServiceClient corev1client.ServicesGetter NamespaceClient corev1client.NamespacesGetter EventClient corev1client.EventsGetter + healthClient rest.Interface ServiceClusterIPRegistry rangeallocation.RangeRegistry ServiceClusterIPInterval time.Duration @@ -80,7 +83,7 @@ type Controller struct { } // NewBootstrapController returns a controller for watching the core capabilities of the master -func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter) *Controller { +func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, healthClient rest.Interface) *Controller { _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort() if err != nil { klog.Fatalf("failed to get listener address: %v", err) @@ -95,6 +98,7 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega ServiceClient: serviceClient, NamespaceClient: nsClient, EventClient: eventClient, + healthClient: healthClient, EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler, EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval, @@ -138,6 +142,12 @@ func (c *Controller) Start() { return } + // Reconcile during first run removing itself until server is ready. + endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) + if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { + klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err) + } + repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) @@ -150,10 +160,6 @@ func (c *Controller) Start() { // If we fail to repair node ports apiserver is useless. We should restart and retry. klog.Fatalf("Unable to perform initial service nodePort check: %v", err) } - // Service definition is reconciled during first run to correct port and type per expectations. - if err := c.UpdateKubernetesService(true); err != nil { - klog.Errorf("Unable to perform initial Kubernetes service initialization: %v", err) - } c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil) c.runner.Start() @@ -168,7 +174,8 @@ func (c *Controller) Stop() { go func() { defer close(finishedReconciling) klog.Infof("Shutting down kubernetes service endpoint reconciler") - if err := c.EndpointReconciler.StopReconciling(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { + c.EndpointReconciler.StopReconciling() + if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil { klog.Error(err) } }() @@ -178,7 +185,7 @@ func (c *Controller) Stop() { // done case <-time.After(2 * c.EndpointInterval): // don't block server shutdown forever if we can't reach etcd to remove ourselves - klog.Warning("StopReconciling() timed out") + klog.Warning("RemoveEndpoints() timed out") } } @@ -196,7 +203,14 @@ func (c *Controller) RunKubernetesNamespaces(ch chan struct{}) { // RunKubernetesService periodically updates the kubernetes service func (c *Controller) RunKubernetesService(ch chan struct{}) { - wait.Until(func() { + // wait until process is ready + wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { + var code int + c.healthClient.Get().AbsPath("/healthz").Do().StatusCode(&code) + return code == http.StatusOK, nil + }, ch) + + wait.NonSlidingUntil(func() { // Service definition is not reconciled after first // run, ports and type will be corrected only during // start. diff --git a/pkg/master/master.go b/pkg/master/master.go index b0d2b05f61e..64e81ed9b95 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -375,7 +375,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic. controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) - bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient) + bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient()) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) diff --git a/pkg/master/reconcilers/lease.go b/pkg/master/reconcilers/lease.go index d7ea00b0096..7a0d54080c7 100644 --- a/pkg/master/reconcilers/lease.go +++ b/pkg/master/reconcilers/lease.go @@ -283,14 +283,16 @@ func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []strin return true, ipsCorrect, portsCorrect } -func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { - r.reconcilingLock.Lock() - defer r.reconcilingLock.Unlock() - r.stopReconcilingCalled = true - +func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { if err := r.masterLeases.RemoveLease(ip.String()); err != nil { return err } return r.doReconcile(serviceName, endpointPorts, true) } + +func (r *leaseEndpointReconciler) StopReconciling() { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + r.stopReconcilingCalled = true +} diff --git a/pkg/master/reconcilers/lease_test.go b/pkg/master/reconcilers/lease_test.go index 3c8402a7867..a3521217712 100644 --- a/pkg/master/reconcilers/lease_test.go +++ b/pkg/master/reconcilers/lease_test.go @@ -547,7 +547,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { } } -func TestLeaseStopReconciling(t *testing.T) { +func TestLeaseRemoveEndpoints(t *testing.T) { ns := corev1.NamespaceDefault om := func(name string) metav1.ObjectMeta { return metav1.ObjectMeta{Namespace: ns, Name: name} @@ -627,7 +627,7 @@ func TestLeaseStopReconciling(t *testing.T) { } } r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases) - err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) + err := r.RemoveEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } diff --git a/pkg/master/reconcilers/mastercount.go b/pkg/master/reconcilers/mastercount.go index 18a635b6a74..4699e90c172 100644 --- a/pkg/master/reconcilers/mastercount.go +++ b/pkg/master/reconcilers/mastercount.go @@ -137,10 +137,9 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i return err } -func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { +func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { r.reconcilingLock.Lock() defer r.reconcilingLock.Unlock() - r.stopReconcilingCalled = true e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) if err != nil { @@ -167,6 +166,12 @@ func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip n return err } +func (r *masterCountEndpointReconciler) StopReconciling() { + r.reconcilingLock.Lock() + defer r.reconcilingLock.Unlock() + r.stopReconcilingCalled = true +} + // Determine if the endpoint is in the format ReconcileEndpoints expects. // // Return values: diff --git a/pkg/master/reconcilers/none.go b/pkg/master/reconcilers/none.go index 9bd4ee5ad7f..2eb49741bbd 100644 --- a/pkg/master/reconcilers/none.go +++ b/pkg/master/reconcilers/none.go @@ -18,8 +18,9 @@ limitations under the License. package reconcilers import ( - corev1 "k8s.io/api/core/v1" "net" + + corev1 "k8s.io/api/core/v1" ) // NoneEndpointReconciler allows for the endpoint reconciler to be disabled @@ -36,7 +37,10 @@ func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.I return nil } -// StopReconciling noop reconcile -func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { +// RemoveEndpoints noop reconcile +func (r *noneEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { return nil } + +func (r *noneEndpointReconciler) StopReconciling() { +} diff --git a/pkg/master/reconcilers/reconcilers.go b/pkg/master/reconcilers/reconcilers.go index 0cfb9a0aaf8..8f280038765 100644 --- a/pkg/master/reconcilers/reconcilers.go +++ b/pkg/master/reconcilers/reconcilers.go @@ -18,8 +18,9 @@ limitations under the License. package reconcilers import ( - corev1 "k8s.io/api/core/v1" "net" + + corev1 "k8s.io/api/core/v1" ) // EndpointReconciler knows how to reconcile the endpoints for the apiserver service. @@ -35,7 +36,10 @@ type EndpointReconciler interface { // endpoints for their {rw, ro} services. // * ReconcileEndpoints is called periodically from all apiservers. ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error - StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error + // RemoveEndpoints removes this apiserver's lease. + RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error + // StopReconciling turns any later ReconcileEndpoints call into a noop. + StopReconciling() } // Type the reconciler type diff --git a/test/integration/master/synthetic_master_test.go b/test/integration/master/synthetic_master_test.go index b49dcb886ad..6cab439f5f9 100644 --- a/test/integration/master/synthetic_master_test.go +++ b/test/integration/master/synthetic_master_test.go @@ -103,8 +103,16 @@ func TestKubernetesService(t *testing.T) { _, _, closeFn := framework.RunAMaster(config) defer closeFn() coreClient := clientset.NewForConfigOrDie(config.GenericConfig.LoopbackClientConfig) - if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil { - t.Fatalf("Expected kubernetes service to exists, got: %v", err) + err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (bool, error) { + if _, err := coreClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err != nil && errors.IsNotFound(err) { + return false, nil + } else if err != nil { + return false, err + } + return true, nil + }) + if err != nil { + t.Fatalf("Expected kubernetes service to exist, got: %v", err) } }