Merge pull request #119064 from sttts/sttts-move-kube-service-controller

pkg/controlplane: move bootstrap controller to controllers/kubernetesservice
This commit is contained in:
Kubernetes Prow Robot 2023-07-04 08:36:53 -07:00 committed by GitHub
commit cb7acfd46e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 581 additions and 582 deletions

View File

@ -1,43 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controlplane
import (
"context"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
)
func createNamespaceIfNeeded(c corev1client.NamespacesGetter, ns string) error {
if _, err := c.Namespaces().Get(context.TODO(), ns, metav1.GetOptions{}); err == nil {
// the namespace already exists
return nil
}
newNs := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
Namespace: "",
},
}
_, err := c.Namespaces().Create(context.TODO(), newNs, metav1.CreateOptions{})
if err != nil && errors.IsAlreadyExists(err) {
err = nil
}
return err
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package controlplane package kubernetesservice
import ( import (
"context" "context"
@ -31,20 +31,18 @@ import (
utilnet "k8s.io/apimachinery/pkg/util/net" utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers" "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controlplane/reconcilers" "k8s.io/kubernetes/pkg/controlplane/reconcilers"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation" "k8s.io/kubernetes/pkg/registry/core/rangeallocation"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller" servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller" portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
netutils "k8s.io/utils/net"
) )
const ( const (
@ -55,91 +53,59 @@ const (
// controller loops, which manage creating the "kubernetes" service and // controller loops, which manage creating the "kubernetes" service and
// provide the IP repair check on service IPs // provide the IP repair check on service IPs
type Controller struct { type Controller struct {
client kubernetes.Interface Config
informers informers.SharedInformerFactory RangeRegistries
runner *async.Runner
}
type RangeRegistries struct {
ServiceClusterIPRegistry rangeallocation.RangeRegistry ServiceClusterIPRegistry rangeallocation.RangeRegistry
ServiceClusterIPRange net.IPNet
SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry
SecondaryServiceClusterIPRange net.IPNet ServiceNodePortRegistry rangeallocation.RangeRegistry
}
ServiceClusterIPInterval time.Duration type Config struct {
Client kubernetes.Interface
Informers informers.SharedInformerFactory
ServiceNodePortRegistry rangeallocation.RangeRegistry KubernetesService
ServiceNodePortInterval time.Duration ClusterIP
ServiceNodePortRange utilnet.PortRange NodePort
}
type KubernetesService struct {
PublicIP net.IP
EndpointReconciler reconcilers.EndpointReconciler EndpointReconciler reconcilers.EndpointReconciler
EndpointInterval time.Duration EndpointInterval time.Duration
PublicIP net.IP
// ServiceIP indicates where the kubernetes service will live. It may not be nil. // ServiceIP indicates where the kubernetes service will live. It may not be nil.
ServiceIP net.IP ServiceIP net.IP
ServicePort int ServicePort int
PublicServicePort int PublicServicePort int
KubernetesServiceNodePort int KubernetesServiceNodePort int
runner *async.Runner
} }
// NewBootstrapController returns a controller for watching the core capabilities of the master type ClusterIP struct {
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, client kubernetes.Interface) (*Controller, error) { ServiceClusterIPRange net.IPNet
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort() SecondaryServiceClusterIPRange net.IPNet
if err != nil { ServiceClusterIPInterval time.Duration
return nil, fmt.Errorf("failed to get listener address: %w", err) }
}
// The "kubernetes.default" Service is SingleStack based on the configured ServiceIPRange. type NodePort struct {
// If the bootstrap controller reconcile the kubernetes.default Service and Endpoints, it must ServiceNodePortInterval time.Duration
// guarantee that the Service ClusterIP and the associated Endpoints have the same IP family, or ServiceNodePortRange utilnet.PortRange
// it will not work for clients because of the IP family mismatch. }
// TODO: revisit for dual-stack https://github.com/kubernetes/enhancements/issues/2438
if c.ExtraConfig.EndpointReconcilerType != reconcilers.NoneEndpointReconcilerType {
if netutils.IsIPv4CIDR(&c.ExtraConfig.ServiceIPRange) != netutils.IsIPv4(c.GenericConfig.PublicAddress) {
return nil, fmt.Errorf("service IP family %q must match public address family %q", c.ExtraConfig.ServiceIPRange.String(), c.GenericConfig.PublicAddress.String())
}
}
// New returns a controller for watching the kubernetes service endpoints.
func New(config Config, rangeRegistries RangeRegistries) (*Controller, error) {
return &Controller{ return &Controller{
client: client, Config: config,
informers: c.ExtraConfig.VersionedInformers, RangeRegistries: rangeRegistries,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator,
SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceClusterIPInterval: c.ExtraConfig.RepairServicesInterval,
ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
ServiceNodePortInterval: c.ExtraConfig.RepairServicesInterval,
PublicIP: c.GenericConfig.PublicAddress,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
}, nil }, nil
} }
// PostStartHook initiates the core controller loops that must exist for bootstrapping.
func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
c.Start()
return nil
}
// PreShutdownHook triggers the actions needed to shut down the API Server cleanly.
func (c *Controller) PreShutdownHook() error {
c.Stop()
return nil
}
// Start begins the core controller loops that must exist for bootstrapping // Start begins the core controller loops that must exist for bootstrapping
// a cluster. // a cluster.
func (c *Controller) Start() { func (c *Controller) Start() {
@ -155,7 +121,7 @@ func (c *Controller) Start() {
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err) klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
} }
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.client.CoreV1(), c.client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.Client.CoreV1(), c.Client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// We start both repairClusterIPs and repairNodePorts to ensure repair // We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts. // loops of ClusterIPs and NodePorts.
@ -177,8 +143,8 @@ func (c *Controller) Start() {
var runRepairClusterIPs func(stopCh chan struct{}) var runRepairClusterIPs func(stopCh chan struct{})
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval,
c.client.CoreV1(), c.Client.CoreV1(),
c.client.EventsV1(), c.Client.EventsV1(),
&c.ServiceClusterIPRange, &c.ServiceClusterIPRange,
c.ServiceClusterIPRegistry, c.ServiceClusterIPRegistry,
&c.SecondaryServiceClusterIPRange, &c.SecondaryServiceClusterIPRange,
@ -188,11 +154,11 @@ func (c *Controller) Start() {
} }
} else { } else {
repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval, repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval,
c.client, c.Client,
&c.ServiceClusterIPRange, &c.ServiceClusterIPRange,
&c.SecondaryServiceClusterIPRange, &c.SecondaryServiceClusterIPRange,
c.informers.Core().V1().Services(), c.Informers.Core().V1().Services(),
c.informers.Networking().V1alpha1().IPAddresses(), c.Informers.Networking().V1alpha1().IPAddresses(),
) )
runRepairClusterIPs = func(stopCh chan struct{}) { runRepairClusterIPs = func(stopCh chan struct{}) {
repairClusterIPs.RunUntil(wg.Done, stopCh) repairClusterIPs.RunUntil(wg.Done, stopCh)
@ -247,7 +213,7 @@ func (c *Controller) RunKubernetesService(ch chan struct{}) {
// wait until process is ready // wait until process is ready
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
var code int var code int
c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code) c.Client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
return code == http.StatusOK, nil return code == http.StatusOK, nil
}, ch) }, ch)
@ -267,8 +233,15 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error {
// TODO: when it becomes possible to change this stuff, // TODO: when it becomes possible to change this stuff,
// stop polling and start watching. // stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master. // TODO: add endpoints of all replicas, not just the elected master.
if err := createNamespaceIfNeeded(c.client.CoreV1(), metav1.NamespaceDefault); err != nil { if _, err := c.Client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil {
return err if _, err := c.Client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: metav1.NamespaceDefault,
Namespace: "",
},
}, metav1.CreateOptions{}); err != nil && !errors.IsAlreadyExists(err) {
return err
}
} }
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https") servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https")
@ -313,12 +286,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.
// CreateOrUpdateMasterServiceIfNeeded will create the specified service if it // CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist. // doesn't already exist.
func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error { func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { if s, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil {
// The service already exists. // The service already exists.
if reconcile { if reconcile {
if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
klog.Warningf("Resetting master service %q to %#v", serviceName, svc) klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{}) _, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
return err return err
} }
} }
@ -342,7 +315,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser
}, },
} }
_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) _, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
if errors.IsAlreadyExists(err) { if errors.IsAlreadyExists(err) {
return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile) return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
} }

View File

@ -0,0 +1,436 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubernetesservice
import (
"reflect"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
netutils "k8s.io/utils/net"
)
func TestCreateOrUpdateMasterService(t *testing.T) {
singleStack := corev1.IPFamilyPolicySingleStack
ns := metav1.NamespaceDefault
om := func(name string) metav1.ObjectMeta {
return metav1.ObjectMeta{Namespace: ns, Name: name}
}
createTests := []struct {
testName string
serviceName string
servicePorts []corev1.ServicePort
serviceType corev1.ServiceType
expectCreate *corev1.Service // nil means none expected
}{
{
testName: "service does not exist",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
expectCreate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
IPFamilyPolicy: &singleStack,
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
}
for _, test := range createTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
master.Client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "create" {
creates = append(creates, action.(core.CreateAction))
}
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creations: %v", test.testName, creates)
} else {
obj := creates[0].GetObject()
if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectCreate == nil && len(creates) > 1 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
}
reconcileTests := []struct {
testName string
serviceName string
servicePorts []corev1.ServicePort
serviceType corev1.ServiceType
service *corev1.Service
expectUpdate *corev1.Service // nil means none expected
}{
{
testName: "service definition wrong port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8000, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition missing port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
{Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
{Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "bar", Port: 1000, Protocol: "UDP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect port name",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 1000, Protocol: "UDP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect target port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect protocol",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "UDP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition has incorrect type",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeNodePort,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition satisfies",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: nil,
},
}
for _, test := range reconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.Client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
}
nonReconcileTests := []struct {
testName string
serviceName string
servicePorts []corev1.ServicePort
serviceType corev1.ServiceType
service *corev1.Service
expectUpdate *corev1.Service // nil means none expected
}{
{
testName: "service definition wrong port, no expected update",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: nil,
},
}
for _, test := range nonReconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.Client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
}
}

View File

@ -17,430 +17,16 @@ limitations under the License.
package controlplane package controlplane
import ( import (
"reflect"
"testing" "testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
genericapiserver "k8s.io/apiserver/pkg/server" genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
) )
func TestCreateOrUpdateMasterService(t *testing.T) {
singleStack := corev1.IPFamilyPolicySingleStack
ns := metav1.NamespaceDefault
om := func(name string) metav1.ObjectMeta {
return metav1.ObjectMeta{Namespace: ns, Name: name}
}
createTests := []struct {
testName string
serviceName string
servicePorts []corev1.ServicePort
serviceType corev1.ServiceType
expectCreate *corev1.Service // nil means none expected
}{
{
testName: "service does not exist",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
expectCreate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
IPFamilyPolicy: &singleStack,
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
}
for _, test := range createTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset()
master.client = fakeClient
master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "create" {
creates = append(creates, action.(core.CreateAction))
}
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creations: %v", test.testName, creates)
} else {
obj := creates[0].GetObject()
if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectCreate == nil && len(creates) > 1 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
}
reconcileTests := []struct {
testName string
serviceName string
servicePorts []corev1.ServicePort
serviceType corev1.ServiceType
service *corev1.Service
expectUpdate *corev1.Service // nil means none expected
}{
{
testName: "service definition wrong port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8000, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition missing port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
{Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
{Name: "baz", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "bar", Port: 1000, Protocol: "UDP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect port name",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 1000, Protocol: "UDP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect target port",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition incorrect protocol",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "UDP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition has incorrect type",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeNodePort,
},
},
expectUpdate: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
},
{
testName: "service definition satisfies",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: nil,
},
}
for _, test := range reconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
}
nonReconcileTests := []struct {
testName string
serviceName string
servicePorts []corev1.ServicePort
serviceType corev1.ServiceType
service *corev1.Service
expectUpdate *corev1.Service // nil means none expected
}{
{
testName: "service definition wrong port, no expected update",
serviceName: "foo",
servicePorts: []corev1.ServicePort{
{Name: "foo", Port: 8080, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)},
},
serviceType: corev1.ServiceTypeClusterIP,
service: &corev1.Service{
ObjectMeta: om("foo"),
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{
{Name: "foo", Port: 1000, Protocol: "TCP", TargetPort: intstr.FromInt32(1000)},
},
Selector: nil,
ClusterIP: "1.2.3.4",
SessionAffinity: corev1.ServiceAffinityNone,
Type: corev1.ServiceTypeClusterIP,
},
},
expectUpdate: nil,
},
}
for _, test := range nonReconcileTests {
master := Controller{}
fakeClient := fake.NewSimpleClientset(test.service)
master.client = fakeClient
err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(core.UpdateAction))
}
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else {
obj := updates[0].GetObject()
if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
}
}
func Test_completedConfig_NewBootstrapController(t *testing.T) { func Test_completedConfig_NewBootstrapController(t *testing.T) {
_, ipv4cidr, err := netutils.ParseCIDRSloppy("192.168.0.0/24") _, ipv4cidr, err := netutils.ParseCIDRSloppy("192.168.0.0/24")
if err != nil { if err != nil {
t.Fatalf("Unexpected error %v", err) t.Fatalf("Unexpected error %v", err)
@ -455,8 +41,7 @@ func Test_completedConfig_NewBootstrapController(t *testing.T) {
ipv6address := netutils.ParseIPSloppy("2001:db8::1") ipv6address := netutils.ParseIPSloppy("2001:db8::1")
type args struct { type args struct {
legacyRESTStorage corerest.LegacyRESTStorage client kubernetes.Interface
client kubernetes.Interface
} }
tests := []struct { tests := []struct {
name string name string
@ -580,7 +165,7 @@ func Test_completedConfig_NewBootstrapController(t *testing.T) {
GenericConfig: tt.config.Complete(nil), GenericConfig: tt.config.Complete(nil),
ExtraConfig: tt.extraConfig, ExtraConfig: tt.extraConfig,
} }
_, err := c.NewBootstrapController(tt.args.legacyRESTStorage, tt.args.client) _, err := c.newKubernetesServiceControllerConfig(tt.args.client)
if (err != nil) != tt.wantErr { if (err != nil) != tt.wantErr {
t.Errorf("completedConfig.NewBootstrapController() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("completedConfig.NewBootstrapController() error = %v, wantErr %v", err, tt.wantErr)
return return

View File

@ -80,6 +80,7 @@ import (
"k8s.io/kubernetes/pkg/controlplane/apiserver/options" "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc" "k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
"k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice"
"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking" "k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces" "k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
"k8s.io/kubernetes/pkg/controlplane/reconcilers" "k8s.io/kubernetes/pkg/controlplane/reconcilers"
@ -88,6 +89,7 @@ import (
"k8s.io/kubernetes/pkg/routes" "k8s.io/kubernetes/pkg/routes"
"k8s.io/kubernetes/pkg/serviceaccount" "k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/utils/clock" "k8s.io/utils/clock"
netutils "k8s.io/utils/net"
// RESTStorage installers // RESTStorage installers
admissionregistrationrest "k8s.io/kubernetes/pkg/registry/admissionregistration/rest" admissionregistrationrest "k8s.io/kubernetes/pkg/registry/admissionregistration/rest"
@ -605,12 +607,16 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
return nil return nil
}) })
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, client) kubenetesserviceConfig, err := c.newKubernetesServiceControllerConfig(client)
if err != nil {
return err
}
bootstrapController, err := kubernetesservice.New(*kubenetesserviceConfig, legacyRESTStorage)
if err != nil { if err != nil {
return fmt.Errorf("error creating bootstrap controller: %v", err) return fmt.Errorf("error creating bootstrap controller: %v", err)
} }
m.GenericAPIServer.AddPostStartHookOrDie(controllerName, bootstrapController.PostStartHook) m.GenericAPIServer.AddPostStartHookOrDie(controllerName, func(genericapiserver.PostStartHookContext) error { bootstrapController.Start(); return nil })
m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, bootstrapController.PreShutdownHook) m.GenericAPIServer.AddPreShutdownHookOrDie(controllerName, func() error { bootstrapController.Stop(); return nil })
if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
return fmt.Errorf("error in registering group versions: %v", err) return fmt.Errorf("error in registering group versions: %v", err)
@ -618,6 +624,51 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
return nil return nil
} }
// newKubernetesServiceControllerConfig returns a configuration for the kubernetes service controller.
func (c completedConfig) newKubernetesServiceControllerConfig(client kubernetes.Interface) (*kubernetesservice.Config, error) {
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
if err != nil {
return nil, fmt.Errorf("failed to get listener address: %w", err)
}
// The "kubernetes.default" Service is SingleStack based on the configured ServiceIPRange.
// If the bootstrap controller reconcile the kubernetes.default Service and Endpoints, it must
// guarantee that the Service ClusterIP and the associated Endpoints have the same IP family, or
// it will not work for clients because of the IP family mismatch.
// TODO: revisit for dual-stack https://github.com/kubernetes/enhancements/issues/2438
if c.ExtraConfig.EndpointReconcilerType != reconcilers.NoneEndpointReconcilerType {
if netutils.IsIPv4CIDR(&c.ExtraConfig.ServiceIPRange) != netutils.IsIPv4(c.GenericConfig.PublicAddress) {
return nil, fmt.Errorf("service IP family %q must match public address family %q", c.ExtraConfig.ServiceIPRange.String(), c.GenericConfig.PublicAddress.String())
}
}
return &kubernetesservice.Config{
Client: client,
Informers: c.ExtraConfig.VersionedInformers,
KubernetesService: kubernetesservice.KubernetesService{
PublicIP: c.GenericConfig.PublicAddress,
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
ServiceIP: c.ExtraConfig.APIServerServiceIP,
ServicePort: c.ExtraConfig.APIServerServicePort,
PublicServicePort: publicServicePort,
KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
},
ClusterIP: kubernetesservice.ClusterIP{
ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange,
SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
ServiceClusterIPInterval: c.ExtraConfig.RepairServicesInterval,
},
NodePort: kubernetesservice.NodePort{
ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange,
ServiceNodePortInterval: c.ExtraConfig.RepairServicesInterval,
},
}, nil
}
// RESTStorageProvider is a factory type for REST storage. // RESTStorageProvider is a factory type for REST storage.
type RESTStorageProvider interface { type RESTStorageProvider interface {
GroupName() string GroupName() string

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cluster/ports" "k8s.io/kubernetes/pkg/cluster/ports"
"k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/registry/core/componentstatus" "k8s.io/kubernetes/pkg/registry/core/componentstatus"
@ -100,7 +101,7 @@ type LegacyRESTStorage struct {
ServiceNodePortAllocator rangeallocation.RangeRegistry ServiceNodePortAllocator rangeallocation.RangeRegistry
} }
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (kubernetesservice.RangeRegistries, genericapiserver.APIGroupInfo, error) {
apiGroupInfo := genericapiserver.APIGroupInfo{ apiGroupInfo := genericapiserver.APIGroupInfo{
PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""), PrioritizedVersions: legacyscheme.Scheme.PrioritizedVersionsForGroup(""),
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{}, VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
@ -111,58 +112,58 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
podDisruptionClient, err := policyclient.NewForConfig(c.LoopbackClientConfig) podDisruptionClient, err := policyclient.NewForConfig(c.LoopbackClientConfig)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
restStorage := LegacyRESTStorage{} restStorage := kubernetesservice.RangeRegistries{}
podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter) podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter) limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter) resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
secretStorage, err := secretstore.NewREST(restOptionsGetter) secretStorage, err := secretstore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter) persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter) persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
configMapStorage, err := configmapstore.NewREST(restOptionsGetter) configMapStorage, err := configmapstore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter) namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter) endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport) nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
podStorage, err := podstore.NewStorage( podStorage, err := podstore.NewStorage(
@ -172,7 +173,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
podDisruptionClient, podDisruptionClient,
) )
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
var serviceAccountStorage *serviceaccountstore.REST var serviceAccountStorage *serviceaccountstore.REST
@ -182,18 +183,18 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, 0, nil, nil, false) serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nil, nil, 0, nil, nil, false)
} }
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
var serviceClusterIPRegistry rangeallocation.RangeRegistry var serviceClusterIPRegistry rangeallocation.RangeRegistry
serviceClusterIPRange := c.ServiceIPRange serviceClusterIPRange := c.ServiceIPRange
if serviceClusterIPRange.IP == nil { if serviceClusterIPRange.IP == nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing") return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("service clusterIPRange is missing")
} }
serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services")) serviceStorageConfig, err := c.StorageFactory.NewConfig(api.Resource("services"))
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
var serviceClusterIPAllocator, secondaryServiceClusterIPAllocator ipallocator.Interface var serviceClusterIPAllocator, secondaryServiceClusterIPAllocator ipallocator.Interface
@ -210,21 +211,21 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
return etcd, nil return etcd, nil
}) })
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err) return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
} }
} else { } else {
networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
serviceClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) serviceClusterIPAllocator, err = ipallocator.NewIPAllocator(&serviceClusterIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err) return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster IP allocator: %v", err)
} }
} }
serviceClusterIPAllocator.EnableMetrics() serviceClusterIPAllocator.EnableMetrics()
restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry restStorage.ServiceClusterIPRegistry = serviceClusterIPRegistry
// allocator for secondary service ip range // allocator for secondary service ip range
if c.SecondaryServiceIPRange.IP != nil { if c.SecondaryServiceIPRange.IP != nil {
@ -242,20 +243,20 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
return etcd, nil return etcd, nil
}) })
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
} }
} else { } else {
networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig) networkingv1alphaClient, err := networkingv1alpha1client.NewForConfig(c.LoopbackClientConfig)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
secondaryServiceClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses()) secondaryServiceClusterIPAllocator, err = ipallocator.NewIPAllocator(&c.SecondaryServiceIPRange, networkingv1alphaClient, c.Informers.Networking().V1alpha1().IPAddresses())
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err)
} }
} }
secondaryServiceClusterIPAllocator.EnableMetrics() secondaryServiceClusterIPAllocator.EnableMetrics()
restStorage.SecondaryServiceClusterIPAllocator = secondaryServiceClusterIPRegistry restStorage.SecondaryServiceClusterIPRegistry = secondaryServiceClusterIPRegistry
} }
var serviceNodePortRegistry rangeallocation.RangeRegistry var serviceNodePortRegistry rangeallocation.RangeRegistry
@ -270,14 +271,14 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
return etcd, nil return etcd, nil
}) })
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err) return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster port allocator: %v", err)
} }
serviceNodePortAllocator.EnableMetrics() serviceNodePortAllocator.EnableMetrics()
restStorage.ServiceNodePortAllocator = serviceNodePortRegistry restStorage.ServiceNodePortRegistry = serviceNodePortRegistry
controllerStorage, err := controllerstore.NewStorage(restOptionsGetter) controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
serviceIPAllocators := map[api.IPFamily]ipallocator.Interface{ serviceIPAllocators := map[api.IPFamily]ipallocator.Interface{
@ -296,7 +297,7 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource
podStorage.Pod, podStorage.Pod,
c.ProxyTransport) c.ProxyTransport)
if err != nil { if err != nil {
return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err return kubernetesservice.RangeRegistries{}, genericapiserver.APIGroupInfo{}, err
} }
storage := map[string]rest.Storage{} storage := map[string]rest.Storage{}

View File

@ -39,7 +39,7 @@ type REST struct {
} }
// NewREST returns a RESTStorage object that will work against service accounts. // NewREST returns a RESTStorage object that will work against service accounts.
func NewREST(optsGetter generic.RESTOptionsGetter, issuer token.TokenGenerator, auds authenticator.Audiences, max time.Duration, podStorage, secretStorage *genericregistry.Store, extendExpiration bool) (*REST, error) { func NewREST(optsGetter generic.RESTOptionsGetter, issuer token.TokenGenerator, auds authenticator.Audiences, max time.Duration, podStorage, secretStorage rest.Getter, extendExpiration bool) (*REST, error) {
store := &genericregistry.Store{ store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.ServiceAccount{} }, NewFunc: func() runtime.Object { return &api.ServiceAccount{} },
NewListFunc: func() runtime.Object { return &api.ServiceAccountList{} }, NewListFunc: func() runtime.Object { return &api.ServiceAccountList{} },

View File

@ -50,9 +50,9 @@ func (r *TokenREST) Destroy() {
} }
type TokenREST struct { type TokenREST struct {
svcaccts getter svcaccts rest.Getter
pods getter pods rest.Getter
secrets getter secrets rest.Getter
issuer token.TokenGenerator issuer token.TokenGenerator
auds authenticator.Audiences auds authenticator.Audiences
audsSet sets.String audsSet sets.String
@ -198,10 +198,6 @@ func (r *TokenREST) GroupVersionKind(schema.GroupVersion) schema.GroupVersionKin
return gvk return gvk
} }
type getter interface {
Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error)
}
// newContext return a copy of ctx in which new RequestInfo is set // newContext return a copy of ctx in which new RequestInfo is set
func newContext(ctx context.Context, resource, name string, gvk schema.GroupVersionKind) context.Context { func newContext(ctx context.Context, resource, name string, gvk schema.GroupVersionKind) context.Context {
oldInfo, found := genericapirequest.RequestInfoFrom(ctx) oldInfo, found := genericapirequest.RequestInfoFrom(ctx)