diff --git a/pkg/genericapiserver/tunneler.go b/pkg/genericapiserver/tunneler.go index ac65416ea08..4438e0e5149 100644 --- a/pkg/genericapiserver/tunneler.go +++ b/pkg/genericapiserver/tunneler.go @@ -17,8 +17,10 @@ limitations under the License. package genericapiserver import ( + "fmt" "io/ioutil" "net" + "net/http" "net/url" "os" "sync/atomic" @@ -45,6 +47,25 @@ type Tunneler interface { SecondsSinceSSHKeySync() int64 } +// TunnelSyncHealthChecker returns a health func that indicates if a tunneler is healthy. +// It's compatible with healthz.NamedCheck +func TunnelSyncHealthChecker(tunneler Tunneler) func(req *http.Request) error { + return func(req *http.Request) error { + if tunneler == nil { + return nil + } + lag := tunneler.SecondsSinceSync() + if lag > 600 { + return fmt.Errorf("Tunnel sync is taking to long: %d", lag) + } + sshKeyLag := tunneler.SecondsSinceSSHKeySync() + if sshKeyLag > 600 { + return fmt.Errorf("SSHKey sync is taking to long: %d", sshKeyLag) + } + return nil + } +} + type SSHTunneler struct { // Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms // See: https://golang.org/pkg/sync/atomic/ for more information diff --git a/pkg/genericapiserver/tunneler_test.go b/pkg/genericapiserver/tunneler_test.go index 911ab426b41..e37c4630f45 100644 --- a/pkg/genericapiserver/tunneler_test.go +++ b/pkg/genericapiserver/tunneler_test.go @@ -18,6 +18,7 @@ package genericapiserver import ( "fmt" + "net" "os" "path/filepath" "testing" @@ -104,3 +105,31 @@ func TestGenerateSSHKey(t *testing.T) { // TODO: testing error cases where the file can not be removed? } + +type FakeTunneler struct { + SecondsSinceSyncValue int64 + SecondsSinceSSHKeySyncValue int64 +} + +func (t *FakeTunneler) Run(AddressFunc) {} +func (t *FakeTunneler) Stop() {} +func (t *FakeTunneler) Dial(net, addr string) (net.Conn, error) { return nil, nil } +func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue } +func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue } + +// TestIsTunnelSyncHealthy verifies that the 600 second lag test +// is honored. +func TestIsTunnelSyncHealthy(t *testing.T) { + tunneler := &FakeTunneler{} + + // Pass case: 540 second lag + tunneler.SecondsSinceSyncValue = 540 + healthFn := TunnelSyncHealthChecker(tunneler) + err := healthFn(nil) + assert.NoError(t, err, "IsTunnelSyncHealthy() should not have returned an error.") + + // Fail case: 720 second lag + tunneler.SecondsSinceSyncValue = 720 + err = healthFn(nil) + assert.Error(t, err, "IsTunnelSyncHealthy() should have returned an error.") +} diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 423acb77c1a..578c87b2834 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -26,9 +26,11 @@ import ( "k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/rest" - "k8s.io/kubernetes/pkg/registry/core/endpoint" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" + "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/registry/core/namespace" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" + corerest "k8s.io/kubernetes/pkg/registry/core/rest" "k8s.io/kubernetes/pkg/registry/core/service" servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller" portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller" @@ -72,6 +74,42 @@ type Controller struct { runner *async.Runner } +// NewBootstrapController returns a controller for watching the core capabilities of the master +func (c *Config) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage) *Controller { + return &Controller{ + NamespaceRegistry: legacyRESTStorage.NamespaceRegistry, + ServiceRegistry: legacyRESTStorage.ServiceRegistry, + + EndpointReconciler: c.EndpointReconcilerConfig.Reconciler, + EndpointInterval: c.EndpointReconcilerConfig.Interval, + + SystemNamespaces: []string{api.NamespaceSystem}, + SystemNamespacesInterval: 1 * time.Minute, + + ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator, + ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange, + ServiceClusterIPInterval: 3 * time.Minute, + + ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator, + ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange, + ServiceNodePortInterval: 3 * time.Minute, + + PublicIP: c.GenericConfig.PublicAddress, + + ServiceIP: c.GenericConfig.ServiceReadWriteIP, + ServicePort: c.GenericConfig.ServiceReadWritePort, + ExtraServicePorts: c.GenericConfig.ExtraServicePorts, + ExtraEndpointPorts: c.GenericConfig.ExtraEndpointPorts, + PublicServicePort: c.GenericConfig.ReadWritePort, + KubernetesServiceNodePort: c.GenericConfig.KubernetesServiceNodePort, + } +} + +func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error { + c.Start() + return nil +} + // Start begins the core controller loops that must exist for bootstrapping // a cluster. func (c *Controller) Start() { @@ -257,18 +295,18 @@ type EndpointReconciler interface { // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of // masters. masterCountEndpointReconciler implements EndpointReconciler. type masterCountEndpointReconciler struct { - masterCount int - endpointRegistry endpoint.Registry + masterCount int + endpointClient coreclient.EndpointsGetter } var _ EndpointReconciler = &masterCountEndpointReconciler{} // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a // specified expected number of masters. -func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint.Registry) *masterCountEndpointReconciler { +func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient.EndpointsGetter) *masterCountEndpointReconciler { return &masterCountEndpointReconciler{ - masterCount: masterCount, - endpointRegistry: endpointRegistry, + masterCount: masterCount, + endpointClient: endpointClient, } } @@ -285,8 +323,7 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint // to be running (c.masterCount). // * ReconcileEndpoints is called periodically from all apiservers. func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { - ctx := api.NewDefaultContext() - e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName) + e, err := r.endpointClient.Endpoints(api.NamespaceDefault).Get(serviceName) if err != nil { e = &api.Endpoints{ ObjectMeta: api.ObjectMeta{ @@ -301,7 +338,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i Addresses: []api.EndpointAddress{{IP: ip.String()}}, Ports: endpointPorts, }} - return r.endpointRegistry.UpdateEndpoints(ctx, e) + _, err = r.endpointClient.Endpoints(api.NamespaceDefault).Create(e) + return err } // First, determine if the endpoint is in the format we expect (one @@ -314,7 +352,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i Ports: endpointPorts, }} glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e) - return r.endpointRegistry.UpdateEndpoints(ctx, e) + _, err = r.endpointClient.Endpoints(api.NamespaceDefault).Update(e) + return err } if ipCorrect && portsCorrect { return nil @@ -349,7 +388,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i e.Subsets[0].Ports = endpointPorts } glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) - return r.endpointRegistry.UpdateEndpoints(ctx, e) + _, err = r.endpointClient.Endpoints(api.NamespaceDefault).Update(e) + return err } // Determine if the endpoint is in the format ReconcileEndpoints expects. diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index 1d3278711f2..11bcf974dbc 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -23,6 +23,8 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/util/intstr" ) @@ -40,6 +42,7 @@ func TestReconcileEndpoints(t *testing.T) { additionalMasters int endpoints *api.EndpointsList expectUpdate *api.Endpoints // nil means none expected + expectCreate *api.Endpoints // nil means none expected }{ { testName: "no existing endpoints", @@ -47,7 +50,7 @@ func TestReconcileEndpoints(t *testing.T) { ip: "1.2.3.4", endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: nil, - expectUpdate: &api.Endpoints{ + expectCreate: &api.Endpoints{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, @@ -222,7 +225,7 @@ func TestReconcileEndpoints(t *testing.T) { }}, }}, }, - expectUpdate: &api.Endpoints{ + expectCreate: &api.Endpoints{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, @@ -371,24 +374,52 @@ func TestReconcileEndpoints(t *testing.T) { }, } for _, test := range reconcile_tests { - registry := ®istrytest.EndpointRegistry{ - Endpoints: test.endpoints, + fakeClient := fake.NewSimpleClientset() + if test.endpoints != nil { + fakeClient = fake.NewSimpleClientset(test.endpoints) } - reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry) + reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core()) err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } + + updates := []core.UpdateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() != "update" { + continue + } + updates = append(updates, action.(core.UpdateAction)) + } if test.expectUpdate != nil { - if len(registry.Updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) - } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + if len(updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, updates) + } else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } - if test.expectUpdate == nil && len(registry.Updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + if test.expectUpdate == nil && len(updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) } + + creates := []core.CreateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() != "create" { + continue + } + creates = append(creates, action.(core.CreateAction)) + } + if test.expectCreate != nil { + if len(creates) != 1 { + t.Errorf("case %q: unexpected creates: %v", test.testName, creates) + } else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectCreate == nil && len(creates) > 0 { + t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates) + } + } non_reconcile_tests := []struct { @@ -399,6 +430,7 @@ func TestReconcileEndpoints(t *testing.T) { additionalMasters int endpoints *api.EndpointsList expectUpdate *api.Endpoints // nil means none expected + expectCreate *api.Endpoints // nil means none expected }{ { testName: "existing endpoints extra service ports missing port no update", @@ -450,7 +482,7 @@ func TestReconcileEndpoints(t *testing.T) { ip: "1.2.3.4", endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: nil, - expectUpdate: &api.Endpoints{ + expectCreate: &api.Endpoints{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, @@ -460,24 +492,52 @@ func TestReconcileEndpoints(t *testing.T) { }, } for _, test := range non_reconcile_tests { - registry := ®istrytest.EndpointRegistry{ - Endpoints: test.endpoints, + fakeClient := fake.NewSimpleClientset() + if test.endpoints != nil { + fakeClient = fake.NewSimpleClientset(test.endpoints) } - reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry) + reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core()) err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } + + updates := []core.UpdateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() != "update" { + continue + } + updates = append(updates, action.(core.UpdateAction)) + } if test.expectUpdate != nil { - if len(registry.Updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) - } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + if len(updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, updates) + } else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } - if test.expectUpdate == nil && len(registry.Updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) + if test.expectUpdate == nil && len(updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) } + + creates := []core.CreateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() != "create" { + continue + } + creates = append(creates, action.(core.CreateAction)) + } + if test.expectCreate != nil { + if len(creates) != 1 { + t.Errorf("case %q: unexpected creates: %v", test.testName, creates) + } else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + if test.expectCreate == nil && len(creates) > 0 { + t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates) + } + } } diff --git a/pkg/master/master.go b/pkg/master/master.go index 8d4b9ec8044..c13b59305c4 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -19,7 +19,6 @@ package master import ( "fmt" "net" - "net/http" "net/url" "strconv" "strings" @@ -51,6 +50,7 @@ import ( "k8s.io/kubernetes/pkg/apis/storage" storageapiv1beta1 "k8s.io/kubernetes/pkg/apis/storage/v1beta1" "k8s.io/kubernetes/pkg/apiserver" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned" "k8s.io/kubernetes/pkg/genericapiserver" "k8s.io/kubernetes/pkg/healthz" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" @@ -123,9 +123,7 @@ type Master struct { *genericapiserver.GenericAPIServer legacyRESTStorageProvider corerest.LegacyRESTStorageProvider - legacyRESTStorage corerest.LegacyRESTStorage - enableCoreControllers bool deleteCollectionWorkers int // storage for third party objects @@ -137,8 +135,8 @@ type Master struct { // Useful for reliable testing. Shouldn't be used otherwise. disableThirdPartyControllerForTesting bool - // Used to start and monitor tunneling - tunneler genericapiserver.Tunneler + // nodeClient is used to back the tunneler + nodeClient coreclient.NodeInterface restOptionsFactory restOptionsFactory } @@ -168,6 +166,16 @@ func (c *Config) Complete() completedConfig { // enable swagger UI only if general UI support is on c.GenericConfig.EnableSwaggerUI = c.GenericConfig.EnableSwaggerUI && c.EnableUISupport + if c.EndpointReconcilerConfig.Interval == 0 { + c.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval + } + + if c.EndpointReconcilerConfig.Reconciler == nil { + // use a default endpoint reconciler if nothing is set + endpointClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + c.EndpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(c.GenericConfig.MasterCount, endpointClient) + } + return completedConfig{c} } @@ -199,9 +207,8 @@ func (c completedConfig) New() (*Master, error) { m := &Master{ GenericAPIServer: s, - enableCoreControllers: c.EnableCoreControllers, deleteCollectionWorkers: c.DeleteCollectionWorkers, - tunneler: c.Tunneler, + nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(), disableThirdPartyControllerForTesting: c.disableThirdPartyControllerForTesting, @@ -248,11 +255,6 @@ func (c completedConfig) New() (*Master, error) { c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{} m.InstallAPIs(c.Config) - // TODO: Attempt clean shutdown? - if m.enableCoreControllers { - m.NewBootstrapController(c.EndpointReconcilerConfig).Start() - } - return m, nil } @@ -269,20 +271,26 @@ func (m *Master) InstallAPIs(c *Config) { if err != nil { glog.Fatalf("Error building core storage: %v", err) } - m.legacyRESTStorage = legacyRESTStorage + + if c.EnableCoreControllers { + bootstrapController := c.NewBootstrapController(legacyRESTStorage) + if err := m.GenericAPIServer.AddPostStartHook("bootstrap-controller", bootstrapController.PostStartHook); err != nil { + glog.Fatalf("Error registering PostStartHook %q: %v", "bootstrap-controller", err) + } + } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) } // Run the tunneler. healthzChecks := []healthz.HealthzChecker{} - if m.tunneler != nil { - m.tunneler.Run(m.getNodeAddresses) - healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy)) + if c.Tunneler != nil { + c.Tunneler.Run(m.getNodeAddresses) + healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", genericapiserver.TunnelSyncHealthChecker(c.Tunneler))) prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "apiserver_proxy_tunnel_sync_latency_secs", Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.", - }, func() float64 { return float64(m.tunneler.SecondsSinceSync()) }) + }, func() float64 { return float64(c.Tunneler.SecondsSinceSync()) }) } healthz.InstallHandler(&m.HandlerContainer.NonSwaggerRoutes, healthzChecks...) @@ -338,51 +346,6 @@ func (m *Master) InstallAPIs(c *Config) { } } -// NewBootstrapController returns a controller for watching the core capabilities of the master. If -// endpointReconcilerConfig.Interval is 0, the default value of DefaultEndpointReconcilerInterval -// will be used instead. If endpointReconcilerConfig.Reconciler is nil, the default -// MasterCountEndpointReconciler will be used. -// TODO this should be kicked off as a server PostHook -func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconcilerConfig) *Controller { - if endpointReconcilerConfig.Interval == 0 { - endpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval - } - - if endpointReconcilerConfig.Reconciler == nil { - // use a default endpoint reconciler if nothing is set - // m.endpointRegistry is set via m.InstallAPIs -> m.initV1ResourcesStorage - endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.legacyRESTStorage.EndpointRegistry) - } - - return &Controller{ - NamespaceRegistry: m.legacyRESTStorage.NamespaceRegistry, - ServiceRegistry: m.legacyRESTStorage.ServiceRegistry, - - EndpointReconciler: endpointReconcilerConfig.Reconciler, - EndpointInterval: endpointReconcilerConfig.Interval, - - SystemNamespaces: []string{api.NamespaceSystem}, - SystemNamespacesInterval: 1 * time.Minute, - - ServiceClusterIPRegistry: m.legacyRESTStorage.ServiceClusterIPAllocator, - ServiceClusterIPRange: m.legacyRESTStorageProvider.ServiceClusterIPRange, - ServiceClusterIPInterval: 3 * time.Minute, - - ServiceNodePortRegistry: m.legacyRESTStorage.ServiceNodePortAllocator, - ServiceNodePortRange: m.legacyRESTStorageProvider.ServiceNodePortRange, - ServiceNodePortInterval: 3 * time.Minute, - - PublicIP: m.ClusterIP, - - ServiceIP: m.ServiceReadWriteIP, - ServicePort: m.ServiceReadWritePort, - ExtraServicePorts: m.ExtraServicePorts, - ExtraEndpointPorts: m.ExtraEndpointPorts, - PublicServicePort: m.PublicReadWritePort, - KubernetesServiceNodePort: m.KubernetesServiceNodePort, - } -} - func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server { serversToValidate := map[string]apiserver.Server{ "controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"}, @@ -697,7 +660,7 @@ func findExternalAddress(node *api.Node) (string, error) { } func (m *Master) getNodeAddresses() ([]string, error) { - nodes, err := m.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, err := m.nodeClient.List(api.ListOptions{}) if err != nil { return nil, err } @@ -713,21 +676,6 @@ func (m *Master) getNodeAddresses() ([]string, error) { return addrs, nil } -func (m *Master) IsTunnelSyncHealthy(req *http.Request) error { - if m.tunneler == nil { - return nil - } - lag := m.tunneler.SecondsSinceSync() - if lag > 600 { - return fmt.Errorf("Tunnel sync is taking to long: %d", lag) - } - sshKeyLag := m.tunneler.SecondsSinceSSHKeySync() - if sshKeyLag > 600 { - return fmt.Errorf("SSHKey sync is taking to long: %d", sshKeyLag) - } - return nil -} - func DefaultAPIResourceConfigSource() *genericapiserver.ResourceConfig { ret := genericapiserver.NewResourceConfig() ret.EnableVersions( diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index f28e434f3d4..fe3cfa9c47f 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/apis/rbac" + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/generated/openapi" "k8s.io/kubernetes/pkg/genericapiserver" @@ -67,9 +68,6 @@ import ( func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) { server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t) - master := &Master{ - GenericAPIServer: &genericapiserver.GenericAPIServer{}, - } config := &Config{ GenericConfig: &genericapiserver.Config{}, } @@ -85,6 +83,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. storageFactory := genericapiserver.NewDefaultStorageFactory(*storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource()) config.StorageFactory = storageFactory + config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}} config.GenericConfig.APIResourceConfigSource = DefaultAPIResourceConfigSource() config.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4") config.GenericConfig.Serializer = api.Codecs @@ -112,7 +111,8 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert. t.Fatal(err) } - master.legacyRESTStorage.NodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{}) + fakeNodeClient := fake.NewSimpleClientset(registrytest.MakeNodeList([]string{"node1", "node2"}, api.NodeResources{})) + master.nodeClient = fakeNodeClient.Core().Nodes() return master, server, *config, assert.New(t) } @@ -161,8 +161,6 @@ func TestNew(t *testing.T) { defer etcdserver.Terminate(t) // Verify many of the variables match their config counterparts - assert.Equal(master.enableCoreControllers, config.EnableCoreControllers) - assert.Equal(master.tunneler, config.Tunneler) assert.Equal(master.RequestContextMapper(), config.GenericConfig.RequestContextMapper) assert.Equal(master.ClusterIP, config.GenericConfig.PublicAddress) @@ -258,25 +256,27 @@ func TestGetNodeAddresses(t *testing.T) { defer etcdserver.Terminate(t) // Fail case (no addresses associated with nodes) - nodes, _ := master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, _ := master.nodeClient.List(api.ListOptions{}) addrs, err := master.getNodeAddresses() assert.Error(err, "getNodeAddresses should have caused an error as there are no addresses.") assert.Equal([]string(nil), addrs) // Pass case with External type IP - nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, _ = master.nodeClient.List(api.ListOptions{}) for index := range nodes.Items { nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeExternalIP, Address: "127.0.0.1"}} + master.nodeClient.Update(&nodes.Items[index]) } addrs, err = master.getNodeAddresses() assert.NoError(err, "getNodeAddresses should not have returned an error.") assert.Equal([]string{"127.0.0.1", "127.0.0.1"}, addrs) // Pass case with LegacyHost type IP - nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil) + nodes, _ = master.nodeClient.List(api.ListOptions{}) for index := range nodes.Items { nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.2"}} + master.nodeClient.Update(&nodes.Items[index]) } addrs, err = master.getNodeAddresses() assert.NoError(err, "getNodeAddresses failback should not have returned an error.") @@ -484,38 +484,6 @@ func TestDiscoveryAtAPIS(t *testing.T) { } } -type FakeTunneler struct { - SecondsSinceSyncValue int64 - SecondsSinceSSHKeySyncValue int64 -} - -func (t *FakeTunneler) Run(genericapiserver.AddressFunc) {} -func (t *FakeTunneler) Stop() {} -func (t *FakeTunneler) Dial(net, addr string) (net.Conn, error) { return nil, nil } -func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue } -func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue } - -// TestIsTunnelSyncHealthy verifies that the 600 second lag test -// is honored. -func TestIsTunnelSyncHealthy(t *testing.T) { - assert := assert.New(t) - tunneler := &FakeTunneler{} - master := &Master{ - GenericAPIServer: &genericapiserver.GenericAPIServer{}, - tunneler: tunneler, - } - - // Pass case: 540 second lag - tunneler.SecondsSinceSyncValue = 540 - err := master.IsTunnelSyncHealthy(nil) - assert.NoError(err, "IsTunnelSyncHealthy() should not have returned an error.") - - // Fail case: 720 second lag - tunneler.SecondsSinceSyncValue = 720 - err = master.IsTunnelSyncHealthy(nil) - assert.Error(err, "IsTunnelSyncHealthy() should have returned an error.") -} - func writeResponseToFile(resp *http.Response, filename string) error { defer resp.Body.Close() diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 61fd204f861..d2903b65bcc 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -30,6 +30,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/autoscaling" @@ -45,6 +46,7 @@ import ( authorizerunion "k8s.io/kubernetes/pkg/auth/authorizer/union" "k8s.io/kubernetes/pkg/auth/user" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -60,6 +62,7 @@ import ( "k8s.io/kubernetes/pkg/storage/storagebackend" utilnet "k8s.io/kubernetes/pkg/util/net" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/plugin/pkg/admission/admit" authenticatorunion "k8s.io/kubernetes/plugin/pkg/auth/authenticator/request/union" @@ -257,6 +260,28 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv // fire the post hooks ourselves m.GenericAPIServer.RunPostStartHooks() + // wait for services to be ready + if masterConfig.EnableCoreControllers { + // TODO Once /healthz is updated for posthooks, we'll wait for good health + coreClient := coreclient.NewForConfigOrDie(&cfg) + svcWatch, err := coreClient.Services(api.NamespaceDefault).Watch(v1.ListOptions{}) + if err != nil { + glog.Fatal(err) + } + _, err = watch.Until(30*time.Second, svcWatch, func(event watch.Event) (bool, error) { + if event.Type != watch.Added { + return false, nil + } + if event.Object.(*v1.Service).Name == "kubernetes" { + return true, nil + } + return false, nil + }) + if err != nil { + glog.Fatal(err) + } + } + return m, s } @@ -333,9 +358,10 @@ func NewMasterConfig() *master.Config { OpenAPIDefinitions: openapi.OpenAPIDefinitions, EnableOpenAPISupport: true, }, - StorageFactory: storageFactory, - EnableWatchCache: true, - KubeletClient: kubeletclient.FakeKubeletClient{}, + StorageFactory: storageFactory, + EnableCoreControllers: true, + EnableWatchCache: true, + KubeletClient: kubeletclient.FakeKubeletClient{}, } } diff --git a/test/integration/openshift/openshift_test.go b/test/integration/openshift/openshift_test.go index 4bac356af15..afbd51c849e 100644 --- a/test/integration/openshift/openshift_test.go +++ b/test/integration/openshift/openshift_test.go @@ -34,8 +34,7 @@ func TestMasterExportsSymbols(t *testing.T) { EnableUISupport: false, EnableLogsSupport: false, } - m := &master.Master{ + _ = &master.Master{ GenericAPIServer: &genericapiserver.GenericAPIServer{}, } - _ = (m).NewBootstrapController(master.EndpointReconcilerConfig{}) } diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index 74cea08ea6a..effe57cfda2 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -129,7 +129,6 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, cache.SharedIndexInformer, clientset.Interface) { masterConfig := framework.NewIntegrationTestMasterConfig() - masterConfig.EnableCoreControllers = false _, s := framework.RunAMaster(masterConfig) config := restclient.Config{Host: s.URL} diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index 8ba050d74bc..e5c3e078d45 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -126,7 +126,6 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, cache.SharedIndexInformer, clientset.Interface) { masterConfig := framework.NewIntegrationTestMasterConfig() - masterConfig.EnableCoreControllers = false _, s := framework.RunAMaster(masterConfig) config := restclient.Config{Host: s.URL}