From 9c33a913de0c63ca9ab59322b1bce78403c020fc Mon Sep 17 00:00:00 2001 From: "zuoxiu.jm" <291271447@qq.com> Date: Thu, 1 Nov 2018 12:11:40 +0800 Subject: [PATCH] use loopback client connection instead of direct etcd call in master lease --- pkg/master/BUILD | 1 - pkg/master/client_ca_hook.go | 2 +- pkg/master/client_util.go | 23 +- pkg/master/controller.go | 35 +- pkg/master/controller_test.go | 556 +++++++++--------- pkg/master/master.go | 23 +- pkg/master/reconcilers/BUILD | 11 +- pkg/master/reconcilers/lease.go | 64 +- pkg/master/reconcilers/lease_test.go | 413 ++++++------- pkg/master/reconcilers/mastercount.go | 38 +- pkg/master/reconcilers/none.go | 6 +- pkg/master/reconcilers/reconcilers.go | 6 +- .../core/service/ipallocator/controller/BUILD | 7 +- .../service/ipallocator/controller/repair.go | 10 +- .../ipallocator/controller/repair_test.go | 27 +- .../service/portallocator/controller/BUILD | 5 +- .../portallocator/controller/repair.go | 11 +- .../portallocator/controller/repair_test.go | 37 +- 18 files changed, 651 insertions(+), 624 deletions(-) diff --git a/pkg/master/BUILD b/pkg/master/BUILD index 429f489c8ea..85c3d605bfd 100644 --- a/pkg/master/BUILD +++ b/pkg/master/BUILD @@ -50,7 +50,6 @@ go_library( "//pkg/registry/batch/rest:go_default_library", "//pkg/registry/certificates/rest:go_default_library", "//pkg/registry/coordination/rest:go_default_library", - "//pkg/registry/core/endpoint/storage:go_default_library", "//pkg/registry/core/rangeallocation:go_default_library", "//pkg/registry/core/rest:go_default_library", "//pkg/registry/core/service/ipallocator:go_default_library", diff --git a/pkg/master/client_ca_hook.go b/pkg/master/client_ca_hook.go index 4e5d4ff5198..8b0e023ed03 100644 --- a/pkg/master/client_ca_hook.go +++ b/pkg/master/client_ca_hook.go @@ -69,7 +69,7 @@ func (h ClientCARegistrationHook) PostStartHook(hookContext genericapiserver.Pos // tryToWriteClientCAs is here for unit testing with a fake client. This is a wait.ConditionFunc so the bool // indicates if the condition was met. True when its finished, false when it should retry. func (h ClientCARegistrationHook) tryToWriteClientCAs(client coreclient.CoreInterface) (bool, error) { - if err := createNamespaceIfNeeded(client, metav1.NamespaceSystem); err != nil { + if err := createNamespaceIfNeededWithInternalClient(client, metav1.NamespaceSystem); err != nil { utilruntime.HandleError(err) return false, nil } diff --git a/pkg/master/client_util.go b/pkg/master/client_util.go index 3868fbae5c5..9323f1984f7 100644 --- a/pkg/master/client_util.go +++ b/pkg/master/client_util.go @@ -17,13 +17,34 @@ limitations under the License. package master import ( + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" api "k8s.io/kubernetes/pkg/apis/core" coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" ) -func createNamespaceIfNeeded(c coreclient.NamespacesGetter, ns string) error { +func createNamespaceIfNeeded(c corev1client.NamespacesGetter, ns string) error { + if _, err := c.Namespaces().Get(ns, metav1.GetOptions{}); err == nil { + // the namespace already exists + return nil + } + newNs := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: ns, + Namespace: "", + }, + } + _, err := c.Namespaces().Create(newNs) + if err != nil && errors.IsAlreadyExists(err) { + err = nil + } + return err +} + +// TODO(yue9944882): Remove it once we switch ClientCARegistrationHook to external types +func createNamespaceIfNeededWithInternalClient(c coreclient.NamespacesGetter, ns string) error { if _, err := c.Namespaces().Get(ns, metav1.GetOptions{}); err == nil { // the namespace already exists return nil diff --git a/pkg/master/controller.go b/pkg/master/controller.go index f4cb74d8da4..e2e6f24e686 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -31,8 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" utilfeature "k8s.io/apiserver/pkg/util/feature" - api "k8s.io/kubernetes/pkg/apis/core" - coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/master/reconcilers" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" @@ -49,9 +48,9 @@ const kubernetesServiceName = "kubernetes" // "default", "kube-system" and "kube-public" namespaces, and provide the IP // repair check on service IPs type Controller struct { - ServiceClient coreclient.ServicesGetter - NamespaceClient coreclient.NamespacesGetter - EventClient coreclient.EventsGetter + ServiceClient corev1client.ServicesGetter + NamespaceClient corev1client.NamespacesGetter + EventClient corev1client.EventsGetter ServiceClusterIPRegistry rangeallocation.RangeRegistry ServiceClusterIPInterval time.Duration @@ -72,8 +71,8 @@ type Controller struct { // ServiceIP indicates where the kubernetes service will live. It may not be nil. ServiceIP net.IP ServicePort int - ExtraServicePorts []api.ServicePort - ExtraEndpointPorts []api.EndpointPort + ExtraServicePorts []corev1.ServicePort + ExtraEndpointPorts []corev1.EndpointPort PublicServicePort int KubernetesServiceNodePort int @@ -81,7 +80,7 @@ type Controller struct { } // NewBootstrapController returns a controller for watching the core capabilities of the master -func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient coreclient.ServicesGetter, nsClient coreclient.NamespacesGetter, eventClient coreclient.EventsGetter) *Controller { +func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter) *Controller { _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort() if err != nil { glog.Fatalf("failed to get listener address: %v", err) @@ -230,17 +229,17 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error { // createPortAndServiceSpec creates an array of service ports. // If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed. -func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string, extraServicePorts []api.ServicePort) ([]api.ServicePort, api.ServiceType) { +func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string, extraServicePorts []corev1.ServicePort) ([]corev1.ServicePort, corev1.ServiceType) { //Use the Cluster IP type for the service port if NodePort isn't provided. //Otherwise, we will be binding the master service to a NodePort. - servicePorts := []api.ServicePort{{Protocol: api.ProtocolTCP, + servicePorts := []corev1.ServicePort{{Protocol: corev1.ProtocolTCP, Port: int32(servicePort), Name: servicePortName, TargetPort: intstr.FromInt(targetServicePort)}} - serviceType := api.ServiceTypeClusterIP + serviceType := corev1.ServiceTypeClusterIP if nodePort > 0 { servicePorts[0].NodePort = int32(nodePort) - serviceType = api.ServiceTypeNodePort + serviceType = corev1.ServiceTypeNodePort } if extraServicePorts != nil { servicePorts = append(servicePorts, extraServicePorts...) @@ -249,8 +248,8 @@ func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort i } // createEndpointPortSpec creates an array of endpoint ports -func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []api.EndpointPort) []api.EndpointPort { - endpointPorts := []api.EndpointPort{{Protocol: api.ProtocolTCP, +func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []corev1.EndpointPort) []corev1.EndpointPort { + endpointPorts := []corev1.EndpointPort{{Protocol: corev1.ProtocolTCP, Port: int32(endpointPort), Name: endpointPortName, }} @@ -262,7 +261,7 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndp // CreateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. -func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType, reconcile bool) error { +func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error { if s, err := c.ServiceClient.Services(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}); err == nil { // The service already exists. if reconcile { @@ -274,18 +273,18 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser } return nil } - svc := &api.Service{ + svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: metav1.NamespaceDefault, Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"}, }, - Spec: api.ServiceSpec{ + Spec: corev1.ServiceSpec{ Ports: servicePorts, // maintained by this code, not by the pod selector Selector: nil, ClusterIP: serviceIP.String(), - SessionAffinity: api.ServiceAffinityNone, + SessionAffinity: corev1.ServiceAffinityNone, Type: serviceType, }, } diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index 8e31a71ccfc..cfab7074ca2 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -21,11 +21,11 @@ import ( "reflect" "testing" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/master/reconcilers" ) @@ -38,23 +38,23 @@ func TestReconcileEndpoints(t *testing.T) { testName string serviceName string ip string - endpointPorts []api.EndpointPort + endpointPorts []corev1.EndpointPort additionalMasters int - endpoints *api.EndpointsList - expectUpdate *api.Endpoints // nil means none expected - expectCreate *api.Endpoints // nil means none expected + endpoints *corev1.EndpointsList + expectUpdate *corev1.Endpoints // nil means none expected + expectCreate *corev1.Endpoints // nil means none expected }{ { testName: "no existing endpoints", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: nil, - expectCreate: &api.Endpoints{ + expectCreate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -62,13 +62,13 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints satisfy", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -77,21 +77,21 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints satisfy but too many", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -99,33 +99,33 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints satisfy but too many + extra masters", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, additionalMasters: 3, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -133,33 +133,33 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints satisfy but too many + extra masters + delete first", serviceName: "foo", ip: "4.3.2.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, additionalMasters: 3, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -167,17 +167,17 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints satisfy and endpoint addresses length less than master count", serviceName: "foo", ip: "4.3.2.2", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, additionalMasters: 3, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -187,27 +187,27 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints current IP missing and address length less than master count", serviceName: "foo", ip: "4.3.2.2", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, additionalMasters: 3, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.1"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -215,21 +215,21 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints wrong name", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("bar"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectCreate: &api.Endpoints{ + expectCreate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -237,21 +237,21 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints wrong IP", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -259,21 +259,21 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints wrong port", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -281,21 +281,21 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints wrong protocol", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -303,21 +303,21 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints wrong port name", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -325,17 +325,17 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints extra service ports satisfy", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"}, @@ -348,24 +348,24 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints extra service ports missing port", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, @@ -376,13 +376,13 @@ func TestReconcileEndpoints(t *testing.T) { testName: "no existing sctp endpoints", serviceName: "boo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}}, endpoints: nil, - expectCreate: &api.Endpoints{ + expectCreate: &corev1.Endpoints{ ObjectMeta: om("boo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}}, }}, }, }, @@ -440,26 +440,26 @@ func TestReconcileEndpoints(t *testing.T) { testName string serviceName string ip string - endpointPorts []api.EndpointPort + endpointPorts []corev1.EndpointPort additionalMasters int - endpoints *api.EndpointsList - expectUpdate *api.Endpoints // nil means none expected - expectCreate *api.Endpoints // nil means none expected + endpoints *corev1.EndpointsList + expectUpdate *corev1.Endpoints // nil means none expected + expectCreate *corev1.Endpoints // nil means none expected }{ { testName: "existing endpoints extra service ports missing port no update", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -469,24 +469,24 @@ func TestReconcileEndpoints(t *testing.T) { testName: "existing endpoints extra service ports, wrong ports, wrong IP", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -494,13 +494,13 @@ func TestReconcileEndpoints(t *testing.T) { testName: "no existing endpoints", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: nil, - expectCreate: &api.Endpoints{ + expectCreate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -565,27 +565,27 @@ func TestCreateOrUpdateMasterService(t *testing.T) { create_tests := []struct { testName string serviceName string - servicePorts []api.ServicePort - serviceType api.ServiceType - expectCreate *api.Service // nil means none expected + servicePorts []corev1.ServicePort + serviceType corev1.ServiceType + expectCreate *corev1.Service // nil means none expected }{ { testName: "service does not exist", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - expectCreate: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + expectCreate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, @@ -606,7 +606,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { t.Errorf("case %q: unexpected creations: %v", test.testName, creates) } else { obj := creates[0].GetObject() - if e, a := test.expectCreate.Spec, obj.(*api.Service).Spec; !reflect.DeepEqual(e, a) { + if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } @@ -619,254 +619,254 @@ func TestCreateOrUpdateMasterService(t *testing.T) { reconcile_tests := []struct { testName string serviceName string - servicePorts []api.ServicePort - serviceType api.ServiceType - service *api.Service - expectUpdate *api.Service // nil means none expected + servicePorts []corev1.ServicePort + serviceType corev1.ServiceType + service *corev1.Service + expectUpdate *corev1.Service // nil means none expected }{ { testName: "service definition wrong port", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8000, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, - expectUpdate: &api.Service{ + expectUpdate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, { testName: "service definition missing port", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, {Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt(1000)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, - expectUpdate: &api.Service{ + expectUpdate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, {Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt(1000)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, { testName: "service definition incorrect port", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "bar", Port: 1000, Protocol: "UDP", TargetPort: intstr.FromInt(1000)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, - expectUpdate: &api.Service{ + expectUpdate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, { testName: "service definition incorrect port name", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 1000, Protocol: "UDP", TargetPort: intstr.FromInt(1000)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, - expectUpdate: &api.Service{ + expectUpdate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, { testName: "service definition incorrect target port", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(1000)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, - expectUpdate: &api.Service{ + expectUpdate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, { testName: "service definition incorrect protocol", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "UDP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, - expectUpdate: &api.Service{ + expectUpdate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, { testName: "service definition has incorrect type", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeNodePort, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeNodePort, }, }, - expectUpdate: &api.Service{ + expectUpdate: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, }, { testName: "service definition satisfies", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, expectUpdate: nil, @@ -891,7 +891,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { t.Errorf("case %q: unexpected updates: %v", test.testName, updates) } else { obj := updates[0].GetObject() - if e, a := test.expectUpdate.Spec, obj.(*api.Service).Spec; !reflect.DeepEqual(e, a) { + if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } @@ -904,28 +904,28 @@ func TestCreateOrUpdateMasterService(t *testing.T) { non_reconcile_tests := []struct { testName string serviceName string - servicePorts []api.ServicePort - serviceType api.ServiceType - service *api.Service - expectUpdate *api.Service // nil means none expected + servicePorts []corev1.ServicePort + serviceType corev1.ServiceType + service *corev1.Service + expectUpdate *corev1.Service // nil means none expected }{ { testName: "service definition wrong port, no expected update", serviceName: "foo", - servicePorts: []api.ServicePort{ + servicePorts: []corev1.ServicePort{ {Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}, }, - serviceType: api.ServiceTypeClusterIP, - service: &api.Service{ + serviceType: corev1.ServiceTypeClusterIP, + service: &corev1.Service{ ObjectMeta: om("foo"), - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{ + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ {Name: "foo", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt(1000)}, }, Selector: nil, ClusterIP: "1.2.3.4", - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Type: corev1.ServiceTypeClusterIP, }, }, expectUpdate: nil, @@ -950,7 +950,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { t.Errorf("case %q: unexpected updates: %v", test.testName, updates) } else { obj := updates[0].GetObject() - if e, a := test.expectUpdate.Spec, obj.(*api.Service).Spec; !reflect.DeepEqual(e, a) { + if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } diff --git a/pkg/master/master.go b/pkg/master/master.go index a7107ffc5ff..97c86986117 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -68,12 +68,10 @@ import ( "k8s.io/client-go/informers" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" api "k8s.io/kubernetes/pkg/apis/core" - coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master/reconcilers" "k8s.io/kubernetes/pkg/master/tunneler" - endpointsstorage "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" "k8s.io/kubernetes/pkg/routes" "k8s.io/kubernetes/pkg/serviceaccount" nodeutil "k8s.io/kubernetes/pkg/util/node" @@ -144,10 +142,10 @@ type ExtraConfig struct { // service because this pkg is linked by out-of-tree projects // like openshift which want to use the GenericAPIServer but also do // more stuff. - ExtraServicePorts []api.ServicePort + ExtraServicePorts []apiv1.ServicePort // Additional ports to be exposed on the GenericAPIServer endpoints // Port names should align with ports defined in ExtraServicePorts - ExtraEndpointPorts []api.EndpointPort + ExtraEndpointPorts []apiv1.EndpointPort // If non-zero, the "kubernetes" services uses this port as NodePort. KubernetesServiceNodePort int @@ -206,7 +204,7 @@ type Master struct { } func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler { - endpointClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointClient) } @@ -215,6 +213,7 @@ func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler { } func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { + endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) ttl := c.ExtraConfig.MasterEndpointReconcileTTL config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo")) if err != nil { @@ -224,18 +223,8 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { if err != nil { glog.Fatalf("Error creating storage factory: %v", err) } - endpointConfig, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("endpoints")) - if err != nil { - glog.Fatalf("Error getting storage config: %v", err) - } - endpointsStorage := endpointsstorage.NewREST(generic.RESTOptions{ - StorageConfig: endpointConfig, - Decorator: generic.UndecoratedStorage, - DeleteCollectionWorkers: 0, - ResourcePrefix: c.ExtraConfig.StorageFactory.ResourcePrefix(api.Resource("endpoints")), - }) masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl) - return reconcilers.NewLeaseEndpointReconciler(endpointsStorage.Store, masterLeases) + return reconcilers.NewLeaseEndpointReconciler(endpointClient, masterLeases) } func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler { @@ -386,7 +375,7 @@ func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic. } controllerName := "bootstrap-controller" - coreClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) diff --git a/pkg/master/reconcilers/BUILD b/pkg/master/reconcilers/BUILD index 09d274bea82..97375d1453e 100644 --- a/pkg/master/reconcilers/BUILD +++ b/pkg/master/reconcilers/BUILD @@ -12,15 +12,14 @@ go_library( importpath = "k8s.io/kubernetes/pkg/master/reconcilers", visibility = ["//visibility:public"], deps = [ - "//pkg/api/endpoints:go_default_library", - "//pkg/apis/core:go_default_library", - "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", + "//pkg/api/v1/endpoints:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", - "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//vendor/github.com/golang/glog:go_default_library", ], @@ -31,9 +30,9 @@ go_test( srcs = ["lease_test.go"], embed = [":go_default_library"], deps = [ - "//pkg/apis/core:go_default_library", - "//pkg/registry/registrytest:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", ], ) diff --git a/pkg/master/reconcilers/lease.go b/pkg/master/reconcilers/lease.go index 65a397db0b3..ec9f7ba63a8 100644 --- a/pkg/master/reconcilers/lease.go +++ b/pkg/master/reconcilers/lease.go @@ -30,14 +30,14 @@ import ( "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kruntime "k8s.io/apimachinery/pkg/runtime" apirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" - "k8s.io/kubernetes/pkg/api/endpoints" - api "k8s.io/kubernetes/pkg/apis/core" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" ) // Leases is an interface which assists in managing the set of active masters @@ -62,7 +62,7 @@ var _ Leases = &storageLeases{} // ListLeases retrieves a list of the current master IPs from storage func (s *storageLeases) ListLeases() ([]string, error) { - ipInfoList := &api.EndpointsList{} + ipInfoList := &corev1.EndpointsList{} if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil { return nil, err } @@ -80,12 +80,12 @@ func (s *storageLeases) ListLeases() ([]string, error) { // UpdateLease resets the TTL on a master IP in storage func (s *storageLeases) UpdateLease(ip string) error { key := path.Join(s.baseKey, ip) - return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &api.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) { + return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) { // just make sure we've got the right IP set, and then refresh the TTL - existing := input.(*api.Endpoints) - existing.Subsets = []api.EndpointSubset{ + existing := input.(*corev1.Endpoints) + existing.Subsets = []corev1.EndpointSubset{ { - Addresses: []api.EndpointAddress{{IP: ip}}, + Addresses: []corev1.EndpointAddress{{IP: ip}}, }, } @@ -106,7 +106,7 @@ func (s *storageLeases) UpdateLease(ip string) error { // RemoveLease removes the lease on a master IP in storage func (s *storageLeases) RemoveLease(ip string) error { - return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &api.Endpoints{}, nil) + return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil) } // NewLeases creates a new etcd-based Leases implementation. @@ -119,16 +119,16 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio } type leaseEndpointReconciler struct { - endpointStorage rest.StandardStorage + endpointClient corev1client.EndpointsGetter masterLeases Leases stopReconcilingCalled bool reconcilingLock sync.Mutex } // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler -func NewLeaseEndpointReconciler(endpointStorage rest.StandardStorage, masterLeases Leases) EndpointReconciler { +func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler { return &leaseEndpointReconciler{ - endpointStorage: endpointStorage, + endpointClient: endpointClient, masterLeases: masterLeases, stopReconcilingCalled: false, } @@ -141,7 +141,7 @@ func NewLeaseEndpointReconciler(endpointStorage rest.StandardStorage, masterLeas // expire. ReconcileEndpoints will notice that the endpoints object is // different from the directory listing, and update the endpoints object // accordingly. -func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { +func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error { r.reconcilingLock.Lock() defer r.reconcilingLock.Unlock() @@ -159,25 +159,21 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net. return r.doReconcile(serviceName, endpointPorts, reconcilePorts) } -func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []api.EndpointPort, reconcilePorts bool) error { - ctx := apirequest.NewDefaultContext() - - // Retrieve the current list of endpoints... - var e *api.Endpoints - obj, err := r.endpointStorage.Get(ctx, serviceName, &metav1.GetOptions{}) +func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error { + e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) + shouldCreate := false if err != nil { if !errors.IsNotFound(err) { return err } - e = &api.Endpoints{ + shouldCreate = true + e = &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, - Namespace: api.NamespaceDefault, + Namespace: corev1.NamespaceDefault, }, } - } else { - e = obj.(*api.Endpoints) } // ... and the list of master IP keys from etcd @@ -201,21 +197,21 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts if !formatCorrect { // Something is egregiously wrong, just re-make the endpoints record. - e.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{}, + e.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{}, Ports: endpointPorts, }} } if !formatCorrect || !ipCorrect { // repopulate the addresses according to the expected IPs from etcd - e.Subsets[0].Addresses = make([]api.EndpointAddress, len(masterIPs)) + e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs)) for ind, ip := range masterIPs { - e.Subsets[0].Addresses[ind] = api.EndpointAddress{IP: ip} + e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip} } // Lexicographic order is retained by this step. - e.Subsets = endpoints.RepackSubsets(e.Subsets) + e.Subsets = endpointsv1.RepackSubsets(e.Subsets) } if !portsCorrect { @@ -224,7 +220,13 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts } glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs) - _, _, err = r.endpointStorage.Update(ctx, e.Name, rest.DefaultUpdatedObjectInfo(e), rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}) + if shouldCreate { + if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) { + err = nil + } + } else { + _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e) + } return err } @@ -236,7 +238,7 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts // * ipsCorrect when the addresses in the endpoints match the expected addresses list // * portsCorrect is true when endpoint ports exactly match provided ports. // portsCorrect is only evaluated when reconcilePorts is set to true. -func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string, ports []api.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) { +func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) { if len(e.Subsets) != 1 { return false, false, false } @@ -281,7 +283,7 @@ func checkEndpointSubsetFormatWithLease(e *api.Endpoints, expectedIPs []string, return true, ipsCorrect, portsCorrect } -func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { +func (r *leaseEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { r.reconcilingLock.Lock() defer r.reconcilingLock.Unlock() r.stopReconcilingCalled = true diff --git a/pkg/master/reconcilers/lease_test.go b/pkg/master/reconcilers/lease_test.go index 97000d39fba..3c8402a7867 100644 --- a/pkg/master/reconcilers/lease_test.go +++ b/pkg/master/reconcilers/lease_test.go @@ -26,9 +26,9 @@ import ( "reflect" "testing" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/registry/registrytest" + "k8s.io/client-go/kubernetes/fake" ) type fakeLeases struct { @@ -76,7 +76,7 @@ func (f *fakeLeases) GetUpdatedKeys() []string { } func TestLeaseEndpointReconciler(t *testing.T) { - ns := api.NamespaceDefault + ns := corev1.NamespaceDefault om := func(name string) metav1.ObjectMeta { return metav1.ObjectMeta{Namespace: ns, Name: name} } @@ -84,22 +84,22 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName string serviceName string ip string - endpointPorts []api.EndpointPort + endpointPorts []corev1.EndpointPort endpointKeys []string - endpoints *api.EndpointsList - expectUpdate *api.Endpoints // nil means none expected + endpoints *corev1.EndpointsList + expectUpdate *corev1.Endpoints // nil means none expected }{ { testName: "no existing endpoints", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: nil, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -107,13 +107,13 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints satisfy", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -122,14 +122,14 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints satisfy + refresh existing key", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"1.2.3.4"}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -138,21 +138,21 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints satisfy but too many", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -160,33 +160,33 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints satisfy but too many + extra masters", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -194,33 +194,33 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints satisfy but too many + extra masters + delete first", serviceName: "foo", ip: "4.3.2.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -228,27 +228,27 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints current IP missing", serviceName: "foo", ip: "4.3.2.2", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"4.3.2.1"}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.1"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.1"}, {IP: "4.3.2.2"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -256,21 +256,21 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints wrong name", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("bar"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -278,21 +278,21 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints wrong IP", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -300,21 +300,21 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints wrong port", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -322,21 +322,21 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints wrong protocol", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -344,21 +344,21 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints wrong port name", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -366,17 +366,17 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints extra service ports satisfy", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"}, @@ -389,24 +389,24 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints extra service ports missing port", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, @@ -417,24 +417,29 @@ func TestLeaseEndpointReconciler(t *testing.T) { for _, test := range reconcileTests { fakeLeases := newFakeLeases() fakeLeases.SetKeys(test.endpointKeys) - registry := ®istrytest.EndpointRegistry{ - Endpoints: test.endpoints, + clientset := fake.NewSimpleClientset() + if test.endpoints != nil { + for _, ep := range test.endpoints.Items { + if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(&ep); err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + continue + } + } } - r := NewLeaseEndpointReconciler(registry, fakeLeases) + r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases) err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } + actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(test.serviceName, metav1.GetOptions{}) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } if test.expectUpdate != nil { - if len(registry.Updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) - } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } - if test.expectUpdate == nil && len(registry.Updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) - } if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys) } @@ -444,25 +449,25 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName string serviceName string ip string - endpointPorts []api.EndpointPort + endpointPorts []corev1.EndpointPort endpointKeys []string - endpoints *api.EndpointsList - expectUpdate *api.Endpoints // nil means none expected + endpoints *corev1.EndpointsList + expectUpdate *corev1.Endpoints // nil means none expected }{ { testName: "existing endpoints extra service ports missing port no update", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -472,24 +477,24 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "existing endpoints extra service ports, wrong ports, wrong IP", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{ + endpointPorts: []corev1.EndpointPort{ {Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"}, }, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -497,13 +502,13 @@ func TestLeaseEndpointReconciler(t *testing.T) { testName: "no existing endpoints", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: nil, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -512,24 +517,29 @@ func TestLeaseEndpointReconciler(t *testing.T) { t.Run(test.testName, func(t *testing.T) { fakeLeases := newFakeLeases() fakeLeases.SetKeys(test.endpointKeys) - registry := ®istrytest.EndpointRegistry{ - Endpoints: test.endpoints, + clientset := fake.NewSimpleClientset() + if test.endpoints != nil { + for _, ep := range test.endpoints.Items { + if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(&ep); err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + continue + } + } } - r := NewLeaseEndpointReconciler(registry, fakeLeases) + r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases) err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } + actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(test.serviceName, metav1.GetOptions{}) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } if test.expectUpdate != nil { - if len(registry.Updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) - } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } - if test.expectUpdate == nil && len(registry.Updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) - } if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys) } @@ -538,7 +548,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { } func TestLeaseStopReconciling(t *testing.T) { - ns := api.NamespaceDefault + ns := corev1.NamespaceDefault om := func(name string) metav1.ObjectMeta { return metav1.ObjectMeta{Namespace: ns, Name: name} } @@ -546,40 +556,40 @@ func TestLeaseStopReconciling(t *testing.T) { testName string serviceName string ip string - endpointPorts []api.EndpointPort + endpointPorts []corev1.EndpointPort endpointKeys []string - endpoints *api.EndpointsList - expectUpdate *api.Endpoints // nil means none expected + endpoints *corev1.EndpointsList + expectUpdate *corev1.Endpoints // nil means none expected }{ { testName: "successful stop reconciling", serviceName: "foo", ip: "1.2.3.4", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, - expectUpdate: &api.Endpoints{ + expectUpdate: &corev1.Endpoints{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -587,19 +597,19 @@ func TestLeaseStopReconciling(t *testing.T) { testName: "stop reconciling with ip not in endpoint ip list", serviceName: "foo", ip: "5.6.7.8", - endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, - endpoints: &api.EndpointsList{ - Items: []api.Endpoints{{ + endpoints: &corev1.EndpointsList{ + Items: []corev1.Endpoints{{ ObjectMeta: om("foo"), - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{ + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{ {IP: "1.2.3.4"}, {IP: "4.3.2.2"}, {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -609,24 +619,27 @@ func TestLeaseStopReconciling(t *testing.T) { t.Run(test.testName, func(t *testing.T) { fakeLeases := newFakeLeases() fakeLeases.SetKeys(test.endpointKeys) - registry := ®istrytest.EndpointRegistry{ - Endpoints: test.endpoints, + clientset := fake.NewSimpleClientset() + for _, ep := range test.endpoints.Items { + if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(&ep); err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + continue + } } - r := NewLeaseEndpointReconciler(registry, fakeLeases) + r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases) err := r.StopReconciling(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } + actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(test.serviceName, metav1.GetOptions{}) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } if test.expectUpdate != nil { - if len(registry.Updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) - } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { + if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } } - if test.expectUpdate == nil && len(registry.Updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates) - } for _, key := range fakeLeases.GetUpdatedKeys() { if key == test.ip { t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key) diff --git a/pkg/master/reconcilers/mastercount.go b/pkg/master/reconcilers/mastercount.go index d7aa4c77185..479883e70a2 100644 --- a/pkg/master/reconcilers/mastercount.go +++ b/pkg/master/reconcilers/mastercount.go @@ -22,26 +22,26 @@ import ( "sync" "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" - "k8s.io/kubernetes/pkg/api/endpoints" - api "k8s.io/kubernetes/pkg/apis/core" - coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" ) // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of // masters. masterCountEndpointReconciler implements EndpointReconciler. type masterCountEndpointReconciler struct { masterCount int - endpointClient coreclient.EndpointsGetter + endpointClient corev1client.EndpointsGetter stopReconcilingCalled bool reconcilingLock sync.Mutex } // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a // specified expected number of masters. -func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient.EndpointsGetter) EndpointReconciler { +func NewMasterCountEndpointReconciler(masterCount int, endpointClient corev1client.EndpointsGetter) EndpointReconciler { return &masterCountEndpointReconciler{ masterCount: masterCount, endpointClient: endpointClient, @@ -60,7 +60,7 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient // * All apiservers MUST know and agree on the number of apiservers expected // to be running (c.masterCount). // * ReconcileEndpoints is called periodically from all apiservers. -func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { +func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error { r.reconcilingLock.Lock() defer r.reconcilingLock.Unlock() @@ -70,7 +70,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) if err != nil { - e = &api.Endpoints{ + e = &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: serviceName, Namespace: metav1.NamespaceDefault, @@ -79,8 +79,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i } if errors.IsNotFound(err) { // Simply create non-existing endpoints for the service. - e.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: ip.String()}}, + e.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: ip.String()}}, Ports: endpointPorts, }} _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e) @@ -92,8 +92,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts) if !formatCorrect { // Something is egregiously wrong, just re-make the endpoints record. - e.Subsets = []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: ip.String()}}, + e.Subsets = []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: ip.String()}}, Ports: endpointPorts, }} glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e) @@ -105,10 +105,10 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i } if !ipCorrect { // We *always* add our own IP address. - e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, api.EndpointAddress{IP: ip.String()}) + e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, corev1.EndpointAddress{IP: ip.String()}) // Lexicographic order is retained by this step. - e.Subsets = endpoints.RepackSubsets(e.Subsets) + e.Subsets = endpointsv1.RepackSubsets(e.Subsets) // If too many IP addresses, remove the ones lexicographically after our // own IP address. Given the requirements stated at the top of @@ -137,7 +137,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i return err } -func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { +func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { r.reconcilingLock.Lock() defer r.reconcilingLock.Unlock() r.stopReconcilingCalled = true @@ -152,14 +152,14 @@ func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip n } // Remove our IP from the list of addresses - new := []api.EndpointAddress{} + new := []corev1.EndpointAddress{} for _, addr := range e.Subsets[0].Addresses { if addr.IP != ip.String() { new = append(new, addr) } } e.Subsets[0].Addresses = new - e.Subsets = endpoints.RepackSubsets(e.Subsets) + e.Subsets = endpointsv1.RepackSubsets(e.Subsets) err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { _, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) return err @@ -175,7 +175,7 @@ func (r *masterCountEndpointReconciler) StopReconciling(serviceName string, ip n // of addresses is less than or equal to the master count. // * portsCorrect is true when endpoint ports exactly match provided ports. // portsCorrect is only evaluated when reconcilePorts is set to true. -func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) { +func checkEndpointSubsetFormat(e *corev1.Endpoints, ip string, ports []corev1.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) { if len(e.Subsets) != 1 { return false, false, false } @@ -214,7 +214,7 @@ func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.Endpoint // * All apiservers MUST use GetMasterServiceUpdateIfNeeded and only // GetMasterServiceUpdateIfNeeded to manage service attributes // * updateMasterService is called periodically from all apiservers. -func GetMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.ServicePort, serviceType api.ServiceType) (s *api.Service, updated bool) { +func GetMasterServiceUpdateIfNeeded(svc *corev1.Service, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType) (s *corev1.Service, updated bool) { // Determine if the service is in the format we expect // (servicePorts are present and service type matches) formatCorrect := checkServiceFormat(svc, servicePorts, serviceType) @@ -229,7 +229,7 @@ func GetMasterServiceUpdateIfNeeded(svc *api.Service, servicePorts []api.Service // Determine if the service is in the correct format // GetMasterServiceUpdateIfNeeded expects (servicePorts are correct // and service type matches). -func checkServiceFormat(s *api.Service, ports []api.ServicePort, serviceType api.ServiceType) (formatCorrect bool) { +func checkServiceFormat(s *corev1.Service, ports []corev1.ServicePort, serviceType corev1.ServiceType) (formatCorrect bool) { if s.Spec.Type != serviceType { return false } diff --git a/pkg/master/reconcilers/none.go b/pkg/master/reconcilers/none.go index dce38c2a66a..9bd4ee5ad7f 100644 --- a/pkg/master/reconcilers/none.go +++ b/pkg/master/reconcilers/none.go @@ -18,7 +18,7 @@ limitations under the License. package reconcilers import ( - api "k8s.io/kubernetes/pkg/apis/core" + corev1 "k8s.io/api/core/v1" "net" ) @@ -32,11 +32,11 @@ func NewNoneEndpointReconciler() EndpointReconciler { } // ReconcileEndpoints noop reconcile -func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { +func (r *noneEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error { return nil } // StopReconciling noop reconcile -func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { +func (r *noneEndpointReconciler) StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error { return nil } diff --git a/pkg/master/reconcilers/reconcilers.go b/pkg/master/reconcilers/reconcilers.go index 8bbabc65901..0cfb9a0aaf8 100644 --- a/pkg/master/reconcilers/reconcilers.go +++ b/pkg/master/reconcilers/reconcilers.go @@ -18,7 +18,7 @@ limitations under the License. package reconcilers import ( - api "k8s.io/kubernetes/pkg/apis/core" + corev1 "k8s.io/api/core/v1" "net" ) @@ -34,8 +34,8 @@ type EndpointReconciler interface { // * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the // endpoints for their {rw, ro} services. // * ReconcileEndpoints is called periodically from all apiservers. - ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error - StopReconciling(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error + ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error + StopReconciling(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error } // Type the reconciler type diff --git a/pkg/registry/core/service/ipallocator/controller/BUILD b/pkg/registry/core/service/ipallocator/controller/BUILD index c10229edbb7..8921bbc23ad 100644 --- a/pkg/registry/core/service/ipallocator/controller/BUILD +++ b/pkg/registry/core/service/ipallocator/controller/BUILD @@ -13,8 +13,7 @@ go_library( deps = [ "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core:go_default_library", - "//pkg/apis/core/helper:go_default_library", - "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", + "//pkg/apis/core/v1/helper:go_default_library", "//pkg/registry/core/rangeallocation:go_default_library", "//pkg/registry/core/service/ipallocator:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -22,6 +21,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", ], @@ -33,9 +33,10 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/apis/core:go_default_library", - "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/registry/core/service/ipallocator:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", ], ) diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index b4aaf1c289c..11bb117d743 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -26,12 +26,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/apis/core/helper" - coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" ) @@ -53,7 +53,7 @@ import ( // TODO: perform repair? type Repair struct { interval time.Duration - serviceClient coreclient.ServicesGetter + serviceClient corev1client.ServicesGetter network *net.IPNet alloc rangeallocation.RangeRegistry leaks map[string]int // counter per leaked IP @@ -66,9 +66,9 @@ const numRepairsBeforeLeakCleanup = 3 // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { +func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")}) + eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")}) recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"}) return &Repair{ diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index 0bbd6f5959c..af2e59b22f4 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -22,9 +22,10 @@ import ( "strings" "testing" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" ) @@ -134,29 +135,29 @@ func TestRepairWithExisting(t *testing.T) { } fakeClient := fake.NewSimpleClientset( - &api.Service{ + &corev1.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"}, - Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"}, }, - &api.Service{ + &corev1.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two"}, - Spec: api.ServiceSpec{ClusterIP: "192.168.1.100"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.100"}, }, - &api.Service{ // outside CIDR, will be dropped + &corev1.Service{ // outside CIDR, will be dropped ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three"}, - Spec: api.ServiceSpec{ClusterIP: "192.168.0.1"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.0.1"}, }, - &api.Service{ // empty, ignored + &corev1.Service{ // empty, ignored ObjectMeta: metav1.ObjectMeta{Namespace: "four", Name: "four"}, - Spec: api.ServiceSpec{ClusterIP: ""}, + Spec: corev1.ServiceSpec{ClusterIP: ""}, }, - &api.Service{ // duplicate, dropped + &corev1.Service{ // duplicate, dropped ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five"}, - Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"}, }, - &api.Service{ // headless + &corev1.Service{ // headless ObjectMeta: metav1.ObjectMeta{Namespace: "six", Name: "six"}, - Spec: api.ServiceSpec{ClusterIP: "None"}, + Spec: corev1.ServiceSpec{ClusterIP: "None"}, }, ) diff --git a/pkg/registry/core/service/portallocator/controller/BUILD b/pkg/registry/core/service/portallocator/controller/BUILD index 8a6b511eda6..c4ea3f284d0 100644 --- a/pkg/registry/core/service/portallocator/controller/BUILD +++ b/pkg/registry/core/service/portallocator/controller/BUILD @@ -13,7 +13,6 @@ go_library( deps = [ "//pkg/api/legacyscheme:go_default_library", "//pkg/apis/core:go_default_library", - "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", "//pkg/registry/core/rangeallocation:go_default_library", "//pkg/registry/core/service/portallocator:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", @@ -22,6 +21,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", ], @@ -33,10 +33,11 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/apis/core:go_default_library", - "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/registry/core/service/portallocator:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", ], ) diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index c1a4f0dadbd..20485cc0442 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -21,16 +21,17 @@ import ( "time" "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" - coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) @@ -38,7 +39,7 @@ import ( // See ipallocator/controller/repair.go; this is a copy for ports. type Repair struct { interval time.Duration - serviceClient coreclient.ServicesGetter + serviceClient corev1client.ServicesGetter portRange net.PortRange alloc rangeallocation.RangeRegistry leaks map[int]int // counter per leaked port @@ -51,9 +52,9 @@ const numRepairsBeforeLeakCleanup = 3 // NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, eventClient coreclient.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { +func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&coreclient.EventSinkImpl{Interface: eventClient.Events("")}) + eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")}) recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "portallocator-repair-controller"}) return &Repair{ @@ -196,7 +197,7 @@ func (c *Repair) runOnce() error { return nil } -func collectServiceNodePorts(service *api.Service) []int { +func collectServiceNodePorts(service *corev1.Service) []int { servicePorts := []int{} for i := range service.Spec.Ports { servicePort := &service.Spec.Ports[i] diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index 48c41aa989a..0df94f898d7 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -21,10 +21,11 @@ import ( "strings" "testing" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/net" + "k8s.io/client-go/kubernetes/fake" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" ) @@ -134,39 +135,39 @@ func TestRepairWithExisting(t *testing.T) { } fakeClient := fake.NewSimpleClientset( - &api.Service{ + &corev1.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"}, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{NodePort: 111}}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{NodePort: 111}}, }, }, - &api.Service{ + &corev1.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two"}, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{NodePort: 122}, {NodePort: 133}}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{NodePort: 122}, {NodePort: 133}}, }, }, - &api.Service{ // outside range, will be dropped + &corev1.Service{ // outside range, will be dropped ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three"}, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{NodePort: 201}}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{NodePort: 201}}, }, }, - &api.Service{ // empty, ignored + &corev1.Service{ // empty, ignored ObjectMeta: metav1.ObjectMeta{Namespace: "four", Name: "four"}, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{}}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{}}, }, }, - &api.Service{ // duplicate, dropped + &corev1.Service{ // duplicate, dropped ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five"}, - Spec: api.ServiceSpec{ - Ports: []api.ServicePort{{NodePort: 111}}, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{{NodePort: 111}}, }, }, - &api.Service{ + &corev1.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "six", Name: "six"}, - Spec: api.ServiceSpec{ + Spec: corev1.ServiceSpec{ HealthCheckNodePort: 144, }, },