convert bootstrap controller to posthook to tighten master.go

This commit is contained in:
deads2k 2016-09-30 12:06:54 -04:00
parent 4bf35b6827
commit 5d3a210321
10 changed files with 245 additions and 156 deletions

View File

@ -17,8 +17,10 @@ limitations under the License.
package genericapiserver
import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"sync/atomic"
@ -45,6 +47,25 @@ type Tunneler interface {
SecondsSinceSSHKeySync() int64
}
// TunnelSyncHealthChecker returns a health func that indicates if a tunneler is healthy.
// It's compatible with healthz.NamedCheck
func TunnelSyncHealthChecker(tunneler Tunneler) func(req *http.Request) error {
return func(req *http.Request) error {
if tunneler == nil {
return nil
}
lag := tunneler.SecondsSinceSync()
if lag > 600 {
return fmt.Errorf("Tunnel sync is taking to long: %d", lag)
}
sshKeyLag := tunneler.SecondsSinceSSHKeySync()
if sshKeyLag > 600 {
return fmt.Errorf("SSHKey sync is taking to long: %d", sshKeyLag)
}
return nil
}
}
type SSHTunneler struct {
// Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information

View File

@ -18,6 +18,7 @@ package genericapiserver
import (
"fmt"
"net"
"os"
"path/filepath"
"testing"
@ -104,3 +105,31 @@ func TestGenerateSSHKey(t *testing.T) {
// TODO: testing error cases where the file can not be removed?
}
type FakeTunneler struct {
SecondsSinceSyncValue int64
SecondsSinceSSHKeySyncValue int64
}
func (t *FakeTunneler) Run(AddressFunc) {}
func (t *FakeTunneler) Stop() {}
func (t *FakeTunneler) Dial(net, addr string) (net.Conn, error) { return nil, nil }
func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue }
func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue }
// TestIsTunnelSyncHealthy verifies that the 600 second lag test
// is honored.
func TestIsTunnelSyncHealthy(t *testing.T) {
tunneler := &FakeTunneler{}
// Pass case: 540 second lag
tunneler.SecondsSinceSyncValue = 540
healthFn := TunnelSyncHealthChecker(tunneler)
err := healthFn(nil)
assert.NoError(t, err, "IsTunnelSyncHealthy() should not have returned an error.")
// Fail case: 720 second lag
tunneler.SecondsSinceSyncValue = 720
err = healthFn(nil)
assert.Error(t, err, "IsTunnelSyncHealthy() should have returned an error.")
}

View File

@ -26,9 +26,11 @@ import (
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/registry/core/namespace"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
"k8s.io/kubernetes/pkg/registry/core/service"
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
@ -72,6 +74,42 @@ type Controller struct {
runner *async.Runner
}
// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *Config) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage) *Controller {
return &Controller{
NamespaceRegistry: legacyRESTStorage.NamespaceRegistry,
ServiceRegistry: legacyRESTStorage.ServiceRegistry,
EndpointReconciler: c.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.EndpointReconcilerConfig.Interval,
SystemNamespaces: []string{api.NamespaceSystem},
SystemNamespacesInterval: 1 * time.Minute,
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceClusterIPInterval: 3 * time.Minute,
ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
ServiceNodePortInterval: 3 * time.Minute,
PublicIP: c.GenericConfig.PublicAddress,
ServiceIP: c.GenericConfig.ServiceReadWriteIP,
ServicePort: c.GenericConfig.ServiceReadWritePort,
ExtraServicePorts: c.GenericConfig.ExtraServicePorts,
ExtraEndpointPorts: c.GenericConfig.ExtraEndpointPorts,
PublicServicePort: c.GenericConfig.ReadWritePort,
KubernetesServiceNodePort: c.GenericConfig.KubernetesServiceNodePort,
}
}
func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
c.Start()
return nil
}
// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start() {
@ -257,18 +295,18 @@ type EndpointReconciler interface {
// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
// masters. masterCountEndpointReconciler implements EndpointReconciler.
type masterCountEndpointReconciler struct {
masterCount int
endpointRegistry endpoint.Registry
masterCount int
endpointClient coreclient.EndpointsGetter
}
var _ EndpointReconciler = &masterCountEndpointReconciler{}
// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
// specified expected number of masters.
func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint.Registry) *masterCountEndpointReconciler {
func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient.EndpointsGetter) *masterCountEndpointReconciler {
return &masterCountEndpointReconciler{
masterCount: masterCount,
endpointRegistry: endpointRegistry,
masterCount: masterCount,
endpointClient: endpointClient,
}
}
@ -285,8 +323,7 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint
// to be running (c.masterCount).
// * ReconcileEndpoints is called periodically from all apiservers.
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := api.NewDefaultContext()
e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName)
e, err := r.endpointClient.Endpoints(api.NamespaceDefault).Get(serviceName)
if err != nil {
e = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
@ -301,7 +338,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
Addresses: []api.EndpointAddress{{IP: ip.String()}},
Ports: endpointPorts,
}}
return r.endpointRegistry.UpdateEndpoints(ctx, e)
_, err = r.endpointClient.Endpoints(api.NamespaceDefault).Create(e)
return err
}
// First, determine if the endpoint is in the format we expect (one
@ -314,7 +352,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
Ports: endpointPorts,
}}
glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
return r.endpointRegistry.UpdateEndpoints(ctx, e)
_, err = r.endpointClient.Endpoints(api.NamespaceDefault).Update(e)
return err
}
if ipCorrect && portsCorrect {
return nil
@ -349,7 +388,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
e.Subsets[0].Ports = endpointPorts
}
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
return r.endpointRegistry.UpdateEndpoints(ctx, e)
_, err = r.endpointClient.Endpoints(api.NamespaceDefault).Update(e)
return err
}
// Determine if the endpoint is in the format ReconcileEndpoints expects.

View File

@ -23,6 +23,8 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/util/intstr"
)
@ -40,6 +42,7 @@ func TestReconcileEndpoints(t *testing.T) {
additionalMasters int
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
expectCreate *api.Endpoints // nil means none expected
}{
{
testName: "no existing endpoints",
@ -47,7 +50,7 @@ func TestReconcileEndpoints(t *testing.T) {
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil,
expectUpdate: &api.Endpoints{
expectCreate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
@ -222,7 +225,7 @@ func TestReconcileEndpoints(t *testing.T) {
}},
}},
},
expectUpdate: &api.Endpoints{
expectCreate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
@ -371,24 +374,52 @@ func TestReconcileEndpoints(t *testing.T) {
},
}
for _, test := range reconcile_tests {
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
fakeClient := fake.NewSimpleClientset()
if test.endpoints != nil {
fakeClient = fake.NewSimpleClientset(test.endpoints)
}
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry)
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core())
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "update" {
continue
}
updates = append(updates, action.(core.UpdateAction))
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "create" {
continue
}
creates = append(creates, action.(core.CreateAction))
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creates: %v", test.testName, creates)
} else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectCreate == nil && len(creates) > 0 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
}
non_reconcile_tests := []struct {
@ -399,6 +430,7 @@ func TestReconcileEndpoints(t *testing.T) {
additionalMasters int
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
expectCreate *api.Endpoints // nil means none expected
}{
{
testName: "existing endpoints extra service ports missing port no update",
@ -450,7 +482,7 @@ func TestReconcileEndpoints(t *testing.T) {
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil,
expectUpdate: &api.Endpoints{
expectCreate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
@ -460,24 +492,52 @@ func TestReconcileEndpoints(t *testing.T) {
},
}
for _, test := range non_reconcile_tests {
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
fakeClient := fake.NewSimpleClientset()
if test.endpoints != nil {
fakeClient = fake.NewSimpleClientset(test.endpoints)
}
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry)
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core())
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "update" {
continue
}
updates = append(updates, action.(core.UpdateAction))
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "create" {
continue
}
creates = append(creates, action.(core.CreateAction))
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creates: %v", test.testName, creates)
} else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectCreate == nil && len(creates) > 0 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
}
}

View File

@ -19,7 +19,6 @@ package master
import (
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
@ -51,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/apis/storage"
storageapiv1beta1 "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
"k8s.io/kubernetes/pkg/apiserver"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/healthz"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
@ -123,9 +123,7 @@ type Master struct {
*genericapiserver.GenericAPIServer
legacyRESTStorageProvider corerest.LegacyRESTStorageProvider
legacyRESTStorage corerest.LegacyRESTStorage
enableCoreControllers bool
deleteCollectionWorkers int
// storage for third party objects
@ -137,8 +135,8 @@ type Master struct {
// Useful for reliable testing. Shouldn't be used otherwise.
disableThirdPartyControllerForTesting bool
// Used to start and monitor tunneling
tunneler genericapiserver.Tunneler
// nodeClient is used to back the tunneler
nodeClient coreclient.NodeInterface
restOptionsFactory restOptionsFactory
}
@ -168,6 +166,16 @@ func (c *Config) Complete() completedConfig {
// enable swagger UI only if general UI support is on
c.GenericConfig.EnableSwaggerUI = c.GenericConfig.EnableSwaggerUI && c.EnableUISupport
if c.EndpointReconcilerConfig.Interval == 0 {
c.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
}
if c.EndpointReconcilerConfig.Reconciler == nil {
// use a default endpoint reconciler if nothing is set
endpointClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
c.EndpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(c.GenericConfig.MasterCount, endpointClient)
}
return completedConfig{c}
}
@ -199,9 +207,8 @@ func (c completedConfig) New() (*Master, error) {
m := &Master{
GenericAPIServer: s,
enableCoreControllers: c.EnableCoreControllers,
deleteCollectionWorkers: c.DeleteCollectionWorkers,
tunneler: c.Tunneler,
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),
disableThirdPartyControllerForTesting: c.disableThirdPartyControllerForTesting,
@ -248,11 +255,6 @@ func (c completedConfig) New() (*Master, error) {
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
m.InstallAPIs(c.Config)
// TODO: Attempt clean shutdown?
if m.enableCoreControllers {
m.NewBootstrapController(c.EndpointReconcilerConfig).Start()
}
return m, nil
}
@ -269,20 +271,26 @@ func (m *Master) InstallAPIs(c *Config) {
if err != nil {
glog.Fatalf("Error building core storage: %v", err)
}
m.legacyRESTStorage = legacyRESTStorage
if c.EnableCoreControllers {
bootstrapController := c.NewBootstrapController(legacyRESTStorage)
if err := m.GenericAPIServer.AddPostStartHook("bootstrap-controller", bootstrapController.PostStartHook); err != nil {
glog.Fatalf("Error registering PostStartHook %q: %v", "bootstrap-controller", err)
}
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
}
// Run the tunneler.
healthzChecks := []healthz.HealthzChecker{}
if m.tunneler != nil {
m.tunneler.Run(m.getNodeAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", m.IsTunnelSyncHealthy))
if c.Tunneler != nil {
c.Tunneler.Run(m.getNodeAddresses)
healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", genericapiserver.TunnelSyncHealthChecker(c.Tunneler)))
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
Help: "The time since the last successful synchronization of the SSH tunnels for proxy requests.",
}, func() float64 { return float64(m.tunneler.SecondsSinceSync()) })
}, func() float64 { return float64(c.Tunneler.SecondsSinceSync()) })
}
healthz.InstallHandler(&m.HandlerContainer.NonSwaggerRoutes, healthzChecks...)
@ -338,51 +346,6 @@ func (m *Master) InstallAPIs(c *Config) {
}
}
// NewBootstrapController returns a controller for watching the core capabilities of the master. If
// endpointReconcilerConfig.Interval is 0, the default value of DefaultEndpointReconcilerInterval
// will be used instead. If endpointReconcilerConfig.Reconciler is nil, the default
// MasterCountEndpointReconciler will be used.
// TODO this should be kicked off as a server PostHook
func (m *Master) NewBootstrapController(endpointReconcilerConfig EndpointReconcilerConfig) *Controller {
if endpointReconcilerConfig.Interval == 0 {
endpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
}
if endpointReconcilerConfig.Reconciler == nil {
// use a default endpoint reconciler if nothing is set
// m.endpointRegistry is set via m.InstallAPIs -> m.initV1ResourcesStorage
endpointReconcilerConfig.Reconciler = NewMasterCountEndpointReconciler(m.MasterCount, m.legacyRESTStorage.EndpointRegistry)
}
return &Controller{
NamespaceRegistry: m.legacyRESTStorage.NamespaceRegistry,
ServiceRegistry: m.legacyRESTStorage.ServiceRegistry,
EndpointReconciler: endpointReconcilerConfig.Reconciler,
EndpointInterval: endpointReconcilerConfig.Interval,
SystemNamespaces: []string{api.NamespaceSystem},
SystemNamespacesInterval: 1 * time.Minute,
ServiceClusterIPRegistry: m.legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: m.legacyRESTStorageProvider.ServiceClusterIPRange,
ServiceClusterIPInterval: 3 * time.Minute,
ServiceNodePortRegistry: m.legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: m.legacyRESTStorageProvider.ServiceNodePortRange,
ServiceNodePortInterval: 3 * time.Minute,
PublicIP: m.ClusterIP,
ServiceIP: m.ServiceReadWriteIP,
ServicePort: m.ServiceReadWritePort,
ExtraServicePorts: m.ExtraServicePorts,
ExtraEndpointPorts: m.ExtraEndpointPorts,
PublicServicePort: m.PublicReadWritePort,
KubernetesServiceNodePort: m.KubernetesServiceNodePort,
}
}
func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
@ -697,7 +660,7 @@ func findExternalAddress(node *api.Node) (string, error) {
}
func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, err := m.nodeClient.List(api.ListOptions{})
if err != nil {
return nil, err
}
@ -713,21 +676,6 @@ func (m *Master) getNodeAddresses() ([]string, error) {
return addrs, nil
}
func (m *Master) IsTunnelSyncHealthy(req *http.Request) error {
if m.tunneler == nil {
return nil
}
lag := m.tunneler.SecondsSinceSync()
if lag > 600 {
return fmt.Errorf("Tunnel sync is taking to long: %d", lag)
}
sshKeyLag := m.tunneler.SecondsSinceSSHKeySync()
if sshKeyLag > 600 {
return fmt.Errorf("SSHKey sync is taking to long: %d", sshKeyLag)
}
return nil
}
func DefaultAPIResourceConfigSource() *genericapiserver.ResourceConfig {
ret := genericapiserver.NewResourceConfig()
ret.EnableVersions(

View File

@ -44,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apis/rbac"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/generated/openapi"
"k8s.io/kubernetes/pkg/genericapiserver"
@ -67,9 +68,6 @@ import (
func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.Assertions) {
server, storageConfig := etcdtesting.NewUnsecuredEtcd3TestClientServer(t)
master := &Master{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
}
config := &Config{
GenericConfig: &genericapiserver.Config{},
}
@ -85,6 +83,7 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
storageFactory := genericapiserver.NewDefaultStorageFactory(*storageConfig, testapi.StorageMediaType(), api.Codecs, resourceEncoding, DefaultAPIResourceConfigSource())
config.StorageFactory = storageFactory
config.GenericConfig.LoopbackClientConfig = &restclient.Config{APIPath: "/api", ContentConfig: restclient.ContentConfig{NegotiatedSerializer: api.Codecs}}
config.GenericConfig.APIResourceConfigSource = DefaultAPIResourceConfigSource()
config.GenericConfig.PublicAddress = net.ParseIP("192.168.10.4")
config.GenericConfig.Serializer = api.Codecs
@ -112,7 +111,8 @@ func setUp(t *testing.T) (*Master, *etcdtesting.EtcdTestServer, Config, *assert.
t.Fatal(err)
}
master.legacyRESTStorage.NodeRegistry = registrytest.NewNodeRegistry([]string{"node1", "node2"}, api.NodeResources{})
fakeNodeClient := fake.NewSimpleClientset(registrytest.MakeNodeList([]string{"node1", "node2"}, api.NodeResources{}))
master.nodeClient = fakeNodeClient.Core().Nodes()
return master, server, *config, assert.New(t)
}
@ -161,8 +161,6 @@ func TestNew(t *testing.T) {
defer etcdserver.Terminate(t)
// Verify many of the variables match their config counterparts
assert.Equal(master.enableCoreControllers, config.EnableCoreControllers)
assert.Equal(master.tunneler, config.Tunneler)
assert.Equal(master.RequestContextMapper(), config.GenericConfig.RequestContextMapper)
assert.Equal(master.ClusterIP, config.GenericConfig.PublicAddress)
@ -258,25 +256,27 @@ func TestGetNodeAddresses(t *testing.T) {
defer etcdserver.Terminate(t)
// Fail case (no addresses associated with nodes)
nodes, _ := master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ := master.nodeClient.List(api.ListOptions{})
addrs, err := master.getNodeAddresses()
assert.Error(err, "getNodeAddresses should have caused an error as there are no addresses.")
assert.Equal([]string(nil), addrs)
// Pass case with External type IP
nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ = master.nodeClient.List(api.ListOptions{})
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeExternalIP, Address: "127.0.0.1"}}
master.nodeClient.Update(&nodes.Items[index])
}
addrs, err = master.getNodeAddresses()
assert.NoError(err, "getNodeAddresses should not have returned an error.")
assert.Equal([]string{"127.0.0.1", "127.0.0.1"}, addrs)
// Pass case with LegacyHost type IP
nodes, _ = master.legacyRESTStorage.NodeRegistry.ListNodes(api.NewDefaultContext(), nil)
nodes, _ = master.nodeClient.List(api.ListOptions{})
for index := range nodes.Items {
nodes.Items[index].Status.Addresses = []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "127.0.0.2"}}
master.nodeClient.Update(&nodes.Items[index])
}
addrs, err = master.getNodeAddresses()
assert.NoError(err, "getNodeAddresses failback should not have returned an error.")
@ -484,38 +484,6 @@ func TestDiscoveryAtAPIS(t *testing.T) {
}
}
type FakeTunneler struct {
SecondsSinceSyncValue int64
SecondsSinceSSHKeySyncValue int64
}
func (t *FakeTunneler) Run(genericapiserver.AddressFunc) {}
func (t *FakeTunneler) Stop() {}
func (t *FakeTunneler) Dial(net, addr string) (net.Conn, error) { return nil, nil }
func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue }
func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue }
// TestIsTunnelSyncHealthy verifies that the 600 second lag test
// is honored.
func TestIsTunnelSyncHealthy(t *testing.T) {
assert := assert.New(t)
tunneler := &FakeTunneler{}
master := &Master{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
tunneler: tunneler,
}
// Pass case: 540 second lag
tunneler.SecondsSinceSyncValue = 540
err := master.IsTunnelSyncHealthy(nil)
assert.NoError(err, "IsTunnelSyncHealthy() should not have returned an error.")
// Fail case: 720 second lag
tunneler.SecondsSinceSyncValue = 720
err = master.IsTunnelSyncHealthy(nil)
assert.Error(err, "IsTunnelSyncHealthy() should have returned an error.")
}
func writeResponseToFile(resp *http.Response, filename string) error {
defer resp.Body.Close()

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apimachinery/registered"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/autoscaling"
@ -45,6 +46,7 @@ import (
authorizerunion "k8s.io/kubernetes/pkg/auth/authorizer/union"
"k8s.io/kubernetes/pkg/auth/user"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -60,6 +62,7 @@ import (
"k8s.io/kubernetes/pkg/storage/storagebackend"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/plugin/pkg/admission/admit"
authenticatorunion "k8s.io/kubernetes/plugin/pkg/auth/authenticator/request/union"
@ -257,6 +260,28 @@ func startMasterOrDie(masterConfig *master.Config, incomingServer *httptest.Serv
// fire the post hooks ourselves
m.GenericAPIServer.RunPostStartHooks()
// wait for services to be ready
if masterConfig.EnableCoreControllers {
// TODO Once /healthz is updated for posthooks, we'll wait for good health
coreClient := coreclient.NewForConfigOrDie(&cfg)
svcWatch, err := coreClient.Services(api.NamespaceDefault).Watch(v1.ListOptions{})
if err != nil {
glog.Fatal(err)
}
_, err = watch.Until(30*time.Second, svcWatch, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
return false, nil
}
if event.Object.(*v1.Service).Name == "kubernetes" {
return true, nil
}
return false, nil
})
if err != nil {
glog.Fatal(err)
}
}
return m, s
}
@ -333,9 +358,10 @@ func NewMasterConfig() *master.Config {
OpenAPIDefinitions: openapi.OpenAPIDefinitions,
EnableOpenAPISupport: true,
},
StorageFactory: storageFactory,
EnableWatchCache: true,
KubeletClient: kubeletclient.FakeKubeletClient{},
StorageFactory: storageFactory,
EnableCoreControllers: true,
EnableWatchCache: true,
KubeletClient: kubeletclient.FakeKubeletClient{},
}
}

View File

@ -34,8 +34,7 @@ func TestMasterExportsSymbols(t *testing.T) {
EnableUISupport: false,
EnableLogsSupport: false,
}
m := &master.Master{
_ = &master.Master{
GenericAPIServer: &genericapiserver.GenericAPIServer{},
}
_ = (m).NewBootstrapController(master.EndpointReconcilerConfig{})
}

View File

@ -129,7 +129,6 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replicaset.ReplicaSetController, cache.SharedIndexInformer, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.EnableCoreControllers = false
_, s := framework.RunAMaster(masterConfig)
config := restclient.Config{Host: s.URL}

View File

@ -126,7 +126,6 @@ func verifyRemainingObjects(t *testing.T, clientSet clientset.Interface, namespa
func rmSetup(t *testing.T, enableGarbageCollector bool) (*httptest.Server, *replication.ReplicationManager, cache.SharedIndexInformer, clientset.Interface) {
masterConfig := framework.NewIntegrationTestMasterConfig()
masterConfig.EnableCoreControllers = false
_, s := framework.RunAMaster(masterConfig)
config := restclient.Config{Host: s.URL}