Merge pull request #37227 from deads2k/api-46-master-client

Automatic merge from submit-queue

switch bootstrap controller to use a client where possible

While looking at https://github.com/kubernetes/kubernetes/issues/37040, I found more places where we can use a normal client instead of a direct to etcd connection.

@wojtek-t you made similar changes in the same controller.
This commit is contained in:
Kubernetes Submit Queue 2016-12-02 08:45:07 -08:00 committed by GitHub
commit 2212c421f6
11 changed files with 87 additions and 120 deletions

View File

@ -24,7 +24,6 @@ go_library(
"//pkg/api/endpoints:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/api/install:go_default_library",
"//pkg/api/rest:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apimachinery/registered:go_default_library",
"//pkg/apis/apps/install:go_default_library",
@ -62,10 +61,8 @@ go_library(
"//pkg/registry/autoscaling/rest:go_default_library",
"//pkg/registry/batch/rest:go_default_library",
"//pkg/registry/certificates/rest:go_default_library",
"//pkg/registry/core/namespace:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/rest:go_default_library",
"//pkg/registry/core/service:go_default_library",
"//pkg/registry/core/service/ipallocator/controller:go_default_library",
"//pkg/registry/core/service/portallocator/controller:go_default_library",
"//pkg/registry/extensions/rest:go_default_library",
@ -120,7 +117,6 @@ go_test(
"//pkg/generated/openapi:go_default_library",
"//pkg/genericapiserver:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/registry/registrytest:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/runtime/schema:go_default_library",
"//pkg/storage/etcd/testing:go_default_library",

View File

@ -25,13 +25,10 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/registry/core/namespace"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
"k8s.io/kubernetes/pkg/registry/core/service"
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
"k8s.io/kubernetes/pkg/util/async"
@ -41,13 +38,14 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
)
const kubernetesServiceName = "kubernetes"
// Controller is the controller manager for the core bootstrap Kubernetes controller
// loops, which manage creating the "kubernetes" service, the "default" and "kube-system"
// namespace, and provide the IP repair check on service IPs
type Controller struct {
ServiceClient coreclient.ServicesGetter
NamespaceRegistry namespace.Registry
ServiceRegistry service.Registry
ServiceClient coreclient.ServicesGetter
NamespaceClient coreclient.NamespacesGetter
ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPInterval time.Duration
@ -65,6 +63,7 @@ type Controller struct {
PublicIP net.IP
// ServiceIP indicates where the kubernetes service will live. It may not be nil.
ServiceIP net.IP
ServicePort int
ExtraServicePorts []api.ServicePort
@ -76,11 +75,10 @@ type Controller struct {
}
// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *Config) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter) *Controller {
func (c *Config) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter) *Controller {
return &Controller{
ServiceClient: serviceClient,
NamespaceRegistry: legacyRESTStorage.NamespaceRegistry,
ServiceRegistry: legacyRESTStorage.ServiceRegistry,
ServiceClient: serviceClient,
NamespaceClient: nsClient,
EndpointReconciler: c.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.EndpointReconcilerConfig.Interval,
@ -119,8 +117,8 @@ func (c *Controller) Start() {
return
}
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceRegistry, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceRegistry, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// run all of the controllers once prior to returning from Start.
if err := repairClusterIPs.RunOnce(); err != nil {
@ -173,23 +171,21 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
if err := c.CreateNamespaceIfNeeded(api.NamespaceDefault); err != nil {
return err
}
if c.ServiceIP != nil {
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
if err := c.CreateOrUpdateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
return err
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil {
return err
}
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
return err
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
return err
}
return nil
}
// CreateNamespaceIfNeeded will create a namespace if it doesn't already exist
func (c *Controller) CreateNamespaceIfNeeded(ns string) error {
ctx := api.NewContext()
if _, err := c.NamespaceRegistry.GetNamespace(ctx, ns); err == nil {
if _, err := c.NamespaceClient.Namespaces().Get(ns); err == nil {
// the namespace already exists
return nil
}
@ -199,7 +195,7 @@ func (c *Controller) CreateNamespaceIfNeeded(ns string) error {
Namespace: "",
},
}
err := c.NamespaceRegistry.CreateNamespace(ctx, newNs)
_, err := c.NamespaceClient.Namespaces().Create(newNs)
if err != nil && errors.IsAlreadyExists(err) {
err = nil
}
@ -241,7 +237,6 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndp
// CreateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType, reconcile bool) error {
ctx := api.NewDefaultContext()
if s, err := c.ServiceClient.Services(api.NamespaceDefault).Get(serviceName); err == nil {
// The service already exists.
if reconcile {
@ -268,13 +263,10 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
Type: serviceType,
},
}
if err := rest.BeforeCreate(service.Strategy, ctx, svc); err != nil {
return err
}
_, err := c.ServiceClient.Services(api.NamespaceDefault).Create(svc)
if err != nil && errors.IsAlreadyExists(err) {
err = nil
if errors.IsAlreadyExists(err) {
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
}
return err
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package master
import (
"errors"
"net"
"reflect"
"testing"
@ -25,7 +24,6 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/util/intstr"
)
@ -578,10 +576,6 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
}
for _, test := range create_tests {
master := Controller{}
registry := &registrytest.ServiceRegistry{
Err: errors.New("unable to get svc"),
}
master.ServiceRegistry = registry
fakeClient := fake.NewSimpleClientset()
master.ServiceClient = fakeClient.Core()
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, net.ParseIP("1.2.3.4"), test.servicePorts, test.serviceType, false)
@ -602,7 +596,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
}
}
if test.expectCreate == nil && len(creates) > 1 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, registry.List.Items)
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
}
@ -864,10 +858,6 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
}
for _, test := range reconcile_tests {
master := Controller{}
registry := &registrytest.ServiceRegistry{
Service: test.service,
}
master.ServiceRegistry = registry
fakeClient := fake.NewSimpleClientset(test.service)
master.ServiceClient = fakeClient.Core()
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, net.ParseIP("1.2.3.4"), test.servicePorts, test.serviceType, true)
@ -891,7 +881,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
}
@ -927,10 +917,6 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
}
for _, test := range non_reconcile_tests {
master := Controller{}
registry := &registrytest.ServiceRegistry{
Service: test.service,
}
master.ServiceRegistry = registry
fakeClient := fake.NewSimpleClientset(test.service)
master.ServiceClient = fakeClient.Core()
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, net.ParseIP("1.2.3.4"), test.servicePorts, test.serviceType, false)
@ -954,7 +940,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) {
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
}
}

View File

@ -272,8 +272,8 @@ func (m *Master) InstallLegacyAPI(c *Config, restOptionsGetter genericapiserver.
}
if c.EnableCoreControllers {
serviceClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, serviceClient)
coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient)
if err := m.GenericAPIServer.AddPostStartHook("bootstrap-controller", bootstrapController.PostStartHook); err != nil {
glog.Fatalf("Error registering PostStartHook %q: %v", "bootstrap-controller", err)
}

View File

@ -31,9 +31,7 @@ go_library(
"//pkg/registry/core/endpoint/etcd:go_default_library",
"//pkg/registry/core/event/etcd:go_default_library",
"//pkg/registry/core/limitrange/etcd:go_default_library",
"//pkg/registry/core/namespace:go_default_library",
"//pkg/registry/core/namespace/etcd:go_default_library",
"//pkg/registry/core/node:go_default_library",
"//pkg/registry/core/node/etcd:go_default_library",
"//pkg/registry/core/persistentvolume/etcd:go_default_library",
"//pkg/registry/core/persistentvolumeclaim/etcd:go_default_library",

View File

@ -43,9 +43,7 @@ import (
endpointsetcd "k8s.io/kubernetes/pkg/registry/core/endpoint/etcd"
eventetcd "k8s.io/kubernetes/pkg/registry/core/event/etcd"
limitrangeetcd "k8s.io/kubernetes/pkg/registry/core/limitrange/etcd"
"k8s.io/kubernetes/pkg/registry/core/namespace"
namespaceetcd "k8s.io/kubernetes/pkg/registry/core/namespace/etcd"
"k8s.io/kubernetes/pkg/registry/core/node"
nodeetcd "k8s.io/kubernetes/pkg/registry/core/node/etcd"
pvetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolume/etcd"
pvcetcd "k8s.io/kubernetes/pkg/registry/core/persistentvolumeclaim/etcd"
@ -86,10 +84,6 @@ type LegacyRESTStorageProvider struct {
// master.go for wiring controllers.
// TODO remove this by running the controller as a poststarthook
type LegacyRESTStorage struct {
NodeRegistry node.Registry
NamespaceRegistry namespace.Registry
ServiceRegistry service.Registry
EndpointRegistry endpoint.Registry
ServiceClusterIPAllocator rangeallocation.RangeRegistry
ServiceNodePortAllocator rangeallocation.RangeRegistry
}
@ -132,16 +126,14 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
configMapStorage := configmapetcd.NewREST(restOptionsGetter(api.Resource("configMaps")))
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage := namespaceetcd.NewREST(restOptionsGetter(api.Resource("namespaces")))
restStorage.NamespaceRegistry = namespace.NewRegistry(namespaceStorage)
endpointsStorage := endpointsetcd.NewREST(restOptionsGetter(api.Resource("endpoints")))
restStorage.EndpointRegistry = endpoint.NewRegistry(endpointsStorage)
endpointRegistry := endpoint.NewRegistry(endpointsStorage)
nodeStorage, err := nodeetcd.NewStorage(restOptionsGetter(api.Resource("nodes")), c.KubeletClientConfig, c.ProxyTransport)
if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err
}
restStorage.NodeRegistry = node.NewRegistry(nodeStorage.Node)
podStorage := podetcd.NewStorage(
restOptionsGetter(api.Resource("pods")),
@ -151,7 +143,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
)
serviceRESTStorage, serviceStatusStorage := serviceetcd.NewREST(restOptionsGetter(api.Resource("services")))
restStorage.ServiceRegistry = service.NewRegistry(serviceRESTStorage)
serviceRegistry := service.NewRegistry(serviceRESTStorage)
var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := c.ServiceIPRange
@ -185,7 +177,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi
controllerStorage := controlleretcd.NewStorage(restOptionsGetter(api.Resource("replicationControllers")))
serviceRest := service.NewStorage(restStorage.ServiceRegistry, restStorage.EndpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
serviceRest := service.NewStorage(serviceRegistry, endpointRegistry, ServiceClusterIPAllocator, ServiceNodePortAllocator, c.ProxyTransport)
restStorageMap := map[string]rest.Storage{
"pods": podStorage.Pod,

View File

@ -17,9 +17,9 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/wait:go_default_library",
@ -33,7 +33,7 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/registry/core/service/ipallocator:go_default_library",
"//pkg/registry/registrytest:go_default_library",
],
)

View File

@ -23,9 +23,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/client/retry"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
"k8s.io/kubernetes/pkg/registry/core/service"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
@ -47,20 +47,20 @@ import (
// TODO: allocate new IPs if necessary
// TODO: perform repair?
type Repair struct {
interval time.Duration
registry service.Registry
network *net.IPNet
alloc rangeallocation.RangeRegistry
interval time.Duration
serviceClient coreclient.ServicesGetter
network *net.IPNet
alloc rangeallocation.RangeRegistry
}
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, registry service.Registry, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
return &Repair{
interval: interval,
registry: registry,
network: network,
alloc: alloc,
interval: interval,
serviceClient: serviceClient,
network: network,
alloc: alloc,
}
}
@ -99,13 +99,12 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
// We explicitly send no resource version, since the resource version
// of 'latest' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
list, err := c.registry.ListServices(ctx, &api.ListOptions{})
list, err := c.serviceClient.Services(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err)
}

View File

@ -23,8 +23,8 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/registry/registrytest"
)
type mockRangeRegistry struct {
@ -49,12 +49,12 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
}
func TestRepair(t *testing.T) {
registry := registrytest.NewServiceRegistry()
fakeClient := fake.NewSimpleClientset()
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{},
}
r := NewRepair(0, registry, cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
@ -67,7 +67,7 @@ func TestRepair(t *testing.T) {
item: &api.RangeAllocation{},
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, registry, cidr, ipregistry)
r = NewRepair(0, fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
@ -84,7 +84,7 @@ func TestRepairEmpty(t *testing.T) {
t.Fatal(err)
}
registry := registrytest.NewServiceRegistry()
fakeClient := fake.NewSimpleClientset()
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{
ObjectMeta: api.ObjectMeta{
@ -94,7 +94,7 @@ func TestRepairEmpty(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, registry, cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
@ -117,29 +117,32 @@ func TestRepairWithExisting(t *testing.T) {
t.Fatal(err)
}
registry := registrytest.NewServiceRegistry()
registry.List = api.ServiceList{
Items: []api.Service{
{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
{
Spec: api.ServiceSpec{ClusterIP: "192.168.1.100"},
},
{ // outside CIDR, will be dropped
Spec: api.ServiceSpec{ClusterIP: "192.168.0.1"},
},
{ // empty, ignored
Spec: api.ServiceSpec{ClusterIP: ""},
},
{ // duplicate, dropped
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
{ // headless
Spec: api.ServiceSpec{ClusterIP: "None"},
},
fakeClient := fake.NewSimpleClientset(
&api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "one", Name: "one"},
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
}
&api.Service{
ObjectMeta: api.ObjectMeta{Namespace: "two", Name: "two"},
Spec: api.ServiceSpec{ClusterIP: "192.168.1.100"},
},
&api.Service{ // outside CIDR, will be dropped
ObjectMeta: api.ObjectMeta{Namespace: "three", Name: "three"},
Spec: api.ServiceSpec{ClusterIP: "192.168.0.1"},
},
&api.Service{ // empty, ignored
ObjectMeta: api.ObjectMeta{Namespace: "four", Name: "four"},
Spec: api.ServiceSpec{ClusterIP: ""},
},
&api.Service{ // duplicate, dropped
ObjectMeta: api.ObjectMeta{Namespace: "five", Name: "five"},
Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"},
},
&api.Service{ // headless
ObjectMeta: api.ObjectMeta{Namespace: "six", Name: "six"},
Spec: api.ServiceSpec{ClusterIP: "None"},
},
)
ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{
@ -150,7 +153,7 @@ func TestRepairWithExisting(t *testing.T) {
Data: dst.Data,
},
}
r := NewRepair(0, registry, cidr, ipregistry)
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}

View File

@ -17,6 +17,7 @@ go_library(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/registry/core/rangeallocation:go_default_library",
"//pkg/registry/core/service:go_default_library",

View File

@ -22,6 +22,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/client/retry"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
"k8s.io/kubernetes/pkg/registry/core/service"
@ -33,20 +34,20 @@ import (
// See ipallocator/controller/repair.go; this is a copy for ports.
type Repair struct {
interval time.Duration
registry service.Registry
portRange net.PortRange
alloc rangeallocation.RangeRegistry
interval time.Duration
serviceClient coreclient.ServicesGetter
portRange net.PortRange
alloc rangeallocation.RangeRegistry
}
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, registry service.Registry, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
return &Repair{
interval: interval,
registry: registry,
portRange: portRange,
alloc: alloc,
interval: interval,
serviceClient: serviceClient,
portRange: portRange,
alloc: alloc,
}
}
@ -88,13 +89,12 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the port block: %v", err)
}
ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll)
// We explicitly send no resource version, since the resource version
// of 'latest' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances.
list, err := c.registry.ListServices(ctx, &api.ListOptions{})
list, err := c.serviceClient.Services(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
return fmt.Errorf("unable to refresh the port block: %v", err)
}