diff --git a/cmd/kube-apiserver/app/options/options.go b/cmd/kube-apiserver/app/options/options.go index ac7ccad2d3e..e15f1052c6b 100644 --- a/cmd/kube-apiserver/app/options/options.go +++ b/cmd/kube-apiserver/app/options/options.go @@ -56,10 +56,16 @@ type ServerRunOptions struct { KubeletConfig kubeletclient.KubeletClientConfig KubernetesServiceNodePort int MaxConnectionBytesPerSec int64 - ServiceClusterIPRange net.IPNet // TODO: make this a list - ServiceNodePortRange utilnet.PortRange - SSHKeyfile string - SSHUser string + // ServiceClusterIPRange is mapped to input provided by user + ServiceClusterIPRanges string + //PrimaryServiceClusterIPRange and SecondaryServiceClusterIPRange are the results + // of parsing ServiceClusterIPRange into actual values + PrimaryServiceClusterIPRange net.IPNet + SecondaryServiceClusterIPRange net.IPNet + + ServiceNodePortRange utilnet.PortRange + SSHKeyfile string + SSHUser string ProxyClientCertFile string ProxyClientKeyFile string @@ -114,7 +120,7 @@ func NewServerRunOptions() *ServerRunOptions { }, ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, } - s.ServiceClusterIPRange = kubeoptions.DefaultServiceIPCIDR + s.ServiceClusterIPRanges = kubeoptions.DefaultServiceIPCIDR.String() // Overwrite the default for storage data format. s.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf" @@ -179,7 +185,8 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { "of type NodePort, using this as the value of the port. If zero, the Kubernetes master "+ "service will be of type ClusterIP.") - fs.IPNetVar(&s.ServiceClusterIPRange, "service-cluster-ip-range", s.ServiceClusterIPRange, ""+ + // TODO (khenidak) change documentation as we move IPv6DualStack feature from ALPHA to BETA + fs.StringVar(&s.ServiceClusterIPRanges, "service-cluster-ip-range", s.ServiceClusterIPRanges, ""+ "A CIDR notation IP range from which to assign service cluster IPs. This must not "+ "overlap with any IP ranges assigned to nodes for pods.") diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 9dfd1d25a76..d41d284aaf6 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -118,7 +118,7 @@ func TestAddFlags(t *testing.T) { // This is a snapshot of expected options parsed by args. expected := &ServerRunOptions{ ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, - ServiceClusterIPRange: kubeoptions.DefaultServiceIPCIDR, + ServiceClusterIPRanges: kubeoptions.DefaultServiceIPCIDR.String(), MasterCount: 5, EndpointReconcilerType: string(reconcilers.LeaseEndpointReconcilerType), AllowPrivileged: false, diff --git a/cmd/kube-apiserver/app/options/validation.go b/cmd/kube-apiserver/app/options/validation.go index d0b1bd0ada9..ae87be5d213 100644 --- a/cmd/kube-apiserver/app/options/validation.go +++ b/cmd/kube-apiserver/app/options/validation.go @@ -19,26 +19,69 @@ package options import ( "errors" "fmt" + "net" + "strings" apiextensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver" utilfeature "k8s.io/apiserver/pkg/util/feature" aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/features" + netutils "k8s.io/utils/net" ) // TODO: Longer term we should read this from some config store, rather than a flag. +// validateClusterIPFlags is expected to be called after Complete() func validateClusterIPFlags(options *ServerRunOptions) []error { var errs []error - if options.ServiceClusterIPRange.IP == nil { - errs = append(errs, errors.New("no --service-cluster-ip-range specified")) + // validate that primary has been processed by user provided values or it has been defaulted + if options.PrimaryServiceClusterIPRange.IP == nil { + errs = append(errs, errors.New("--service-cluster-ip-range must contain at least one valid cidr")) } - var ones, bits = options.ServiceClusterIPRange.Mask.Size() + + serviceClusterIPRangeList := strings.Split(options.ServiceClusterIPRanges, ",") + if len(serviceClusterIPRangeList) > 2 { + errs = append(errs, errors.New("--service-cluster-ip-range must not contain more than two entries")) + } + + // Complete() expected to have set Primary* and Secondary* + // primary CIDR validation + var ones, bits = options.PrimaryServiceClusterIPRange.Mask.Size() if bits-ones > 20 { errs = append(errs, errors.New("specified --service-cluster-ip-range is too large")) } + // Secondary IP validation + secondaryServiceClusterIPRangeUsed := (options.SecondaryServiceClusterIPRange.IP != nil) + if secondaryServiceClusterIPRangeUsed && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { + errs = append(errs, fmt.Errorf("--secondary-service-cluster-ip-range can only be used if %v feature is enabled", string(features.IPv6DualStack))) + } + + // note: While the cluster might be dualstack (i.e. pods with multiple IPs), the user may choose + // to only ingress traffic within and into the cluster on one IP family only. this family is decided + // by the range set on --service-cluster-ip-range. If/when the user decides to use dual stack services + // the Secondary* must be of different IPFamily than --service-cluster-ip-range + if secondaryServiceClusterIPRangeUsed { + // Should be dualstack IPFamily(PrimaryServiceClusterIPRange) != IPFamily(SecondaryServiceClusterIPRange) + dualstack, err := netutils.IsDualStackCIDRs([]*net.IPNet{&options.PrimaryServiceClusterIPRange, &options.SecondaryServiceClusterIPRange}) + if err != nil { + errs = append(errs, errors.New("error attempting to validate dualstack for --service-cluster-ip-range and --secondary-service-cluster-ip-range")) + } + + if !dualstack { + errs = append(errs, errors.New("--service-cluster-ip-range and --secondary-service-cluster-ip-range must be of different IP family")) + } + + // should be smallish sized cidr, this thing is kept in etcd + // bigger cidr (specially those offered by IPv6) will add no value + // significantly increase snapshotting time. + var ones, bits = options.SecondaryServiceClusterIPRange.Mask.Size() + if bits-ones > 20 { + errs = append(errs, errors.New("specified --secondary-service-cluster-ip-range is too large")) + } + } + return errs } diff --git a/cmd/kube-apiserver/app/options/validation_test.go b/cmd/kube-apiserver/app/options/validation_test.go new file mode 100644 index 00000000000..535149fdd8b --- /dev/null +++ b/cmd/kube-apiserver/app/options/validation_test.go @@ -0,0 +1,132 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "net" + "testing" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" +) + +func makeOptionsWithCIDRs(serviceCIDR string, secondaryServiceCIDR string) *ServerRunOptions { + value := serviceCIDR + if len(secondaryServiceCIDR) > 0 { + value = value + "," + secondaryServiceCIDR + } + + var primaryCIDR, secondaryCIDR net.IPNet + if len(serviceCIDR) > 0 { + _, cidr, _ := net.ParseCIDR(serviceCIDR) + if cidr != nil { + primaryCIDR = *(cidr) + } + } + + if len(secondaryServiceCIDR) > 0 { + _, cidr, _ := net.ParseCIDR(secondaryServiceCIDR) + if cidr != nil { + secondaryCIDR = *(cidr) + } + } + return &ServerRunOptions{ + ServiceClusterIPRanges: value, + PrimaryServiceClusterIPRange: primaryCIDR, + SecondaryServiceClusterIPRange: secondaryCIDR, + } +} + +func TestClusterSerivceIPRange(t *testing.T) { + testCases := []struct { + name string + options *ServerRunOptions + enableDualStack bool + expectErrors bool + }{ + { + name: "no service cidr", + expectErrors: true, + options: makeOptionsWithCIDRs("", ""), + enableDualStack: false, + }, + { + name: "only secondary service cidr, dual stack gate on", + expectErrors: true, + options: makeOptionsWithCIDRs("", "10.0.0.0/16"), + enableDualStack: true, + }, + { + name: "only secondary service cidr, dual stack gate off", + expectErrors: true, + options: makeOptionsWithCIDRs("", "10.0.0.0/16"), + enableDualStack: false, + }, + { + name: "primary and secondary are provided but not dual stack v4-v4", + expectErrors: true, + options: makeOptionsWithCIDRs("10.0.0.0/16", "11.0.0.0/16"), + enableDualStack: true, + }, + { + name: "primary and secondary are provided but not dual stack v6-v6", + expectErrors: true, + options: makeOptionsWithCIDRs("2000::/108", "3000::/108"), + enableDualStack: true, + }, + { + name: "valid dual stack with gate disabled", + expectErrors: true, + options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/108"), + enableDualStack: false, + }, + /* success cases */ + { + name: "valid primary", + expectErrors: false, + options: makeOptionsWithCIDRs("10.0.0.0/16", ""), + enableDualStack: false, + }, + { + name: "valid v4-v6 dual stack + gate on", + expectErrors: false, + options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/108"), + enableDualStack: true, + }, + { + name: "valid v6-v4 dual stack + gate on", + expectErrors: false, + options: makeOptionsWithCIDRs("3000::/108", "10.0.0.0/16"), + enableDualStack: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() + errs := validateClusterIPFlags(tc.options) + if len(errs) > 0 && !tc.expectErrors { + t.Errorf("expected no errors, errors found %+v", errs) + } + + if len(errs) == 0 && tc.expectErrors { + t.Errorf("expected errors, no errors found") + } + }) + } +} diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index b4b73cb863a..d64929577f8 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -306,11 +306,21 @@ func CreateKubeAPIServerConfig( PerConnectionBandwidthLimitBytesPerSec: s.MaxConnectionBytesPerSec, }) - serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.ServiceClusterIPRange) + serviceIPRange, apiServerServiceIP, lastErr := master.DefaultServiceIPRange(s.PrimaryServiceClusterIPRange) if lastErr != nil { return } + // defaults to empty range and ip + var secondaryServiceIPRange net.IPNet + // process secondary range only if provided by user + if s.SecondaryServiceClusterIPRange.IP != nil { + secondaryServiceIPRange, _, lastErr = master.DefaultServiceIPRange(s.SecondaryServiceClusterIPRange) + if lastErr != nil { + return + } + } + clientCA, lastErr := readCAorNil(s.Authentication.ClientCert.ClientCA) if lastErr != nil { return @@ -341,8 +351,10 @@ func CreateKubeAPIServerConfig( Tunneler: nodeTunneler, - ServiceIPRange: serviceIPRange, - APIServerServiceIP: apiServerServiceIP, + ServiceIPRange: serviceIPRange, + APIServerServiceIP: apiServerServiceIP, + SecondaryServiceIPRange: secondaryServiceIPRange, + APIServerServicePort: 443, ServiceNodePortRange: s.ServiceNodePortRange, @@ -548,11 +560,49 @@ func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) { if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing.DeprecatedInsecureServingOptions); err != nil { return options, err } - serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange) - if err != nil { - return options, fmt.Errorf("error determining service IP ranges: %v", err) + + // process s.ServiceClusterIPRange from list to Primary and Secondary + // we process secondary only if provided by user + + serviceClusterIPRangeList := strings.Split(s.ServiceClusterIPRanges, ",") + + var apiServerServiceIP net.IP + var serviceIPRange net.IPNet + var err error + // nothing provided by user, use default range (only applies to the Primary) + if len(serviceClusterIPRangeList) == 0 { + var primaryServiceClusterCIDR net.IPNet + serviceIPRange, apiServerServiceIP, err = master.DefaultServiceIPRange(primaryServiceClusterCIDR) + if err != nil { + return options, fmt.Errorf("error determining service IP ranges: %v", err) + } + s.PrimaryServiceClusterIPRange = serviceIPRange } - s.ServiceClusterIPRange = serviceIPRange + + if len(serviceClusterIPRangeList) > 0 { + _, primaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[0]) + if err != nil { + return options, fmt.Errorf("service-cluster-ip-range[0] is not a valid cidr") + } + + serviceIPRange, apiServerServiceIP, err = master.DefaultServiceIPRange(*(primaryServiceClusterCIDR)) + if err != nil { + return options, fmt.Errorf("error determining service IP ranges for primary service cidr: %v", err) + } + s.PrimaryServiceClusterIPRange = serviceIPRange + } + + // user provided at least two entries + if len(serviceClusterIPRangeList) > 1 { + _, secondaryServiceClusterCIDR, err := net.ParseCIDR(serviceClusterIPRangeList[1]) + if err != nil { + return options, fmt.Errorf("service-cluster-ip-range[1] is not an ip net") + } + + s.SecondaryServiceClusterIPRange = *(secondaryServiceClusterCIDR) + } + //note: validation asserts that the list is max of two dual stack entries + if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil { return options, fmt.Errorf("error creating self-signed certificates: %v", err) } diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 10b12c1b867..83139748257 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -133,8 +133,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } s.SecureServing.ServerCert.FixtureDirectory = path.Join(path.Dir(thisFile), "testdata") - s.ServiceClusterIPRange.IP = net.IPv4(10, 0, 0, 0) - s.ServiceClusterIPRange.Mask = net.CIDRMask(16, 32) + s.ServiceClusterIPRanges = "10.0.0.0/16" s.Etcd.StorageConfig = *storageConfig s.APIEnablement.RuntimeConfig.Set("api/all=true") diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 907b4786721..54026c3df85 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -45,6 +45,10 @@ import ( api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" + utilnet "k8s.io/utils/net" ) const ( @@ -218,6 +222,37 @@ func (e *EndpointController) addPod(obj interface{}) { } } +func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) { + if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { + return podToEndpointAddress(pod), nil + } + + // api-server service controller ensured that the service got the correct IP Family + // according to user setup, here we only need to match EndPoint IPs' family to service + // actual IP family. as in, we don't need to check service.IPFamily + + ipv6ClusterIP := utilnet.IsIPv6String(svc.Spec.ClusterIP) + for _, podIP := range pod.Status.PodIPs { + ipv6PodIP := utilnet.IsIPv6String(podIP.IP) + // same family? + // TODO (khenidak) when we remove the max of 2 PodIP limit from pods + // we will have to return multiple endpoint addresses + if ipv6ClusterIP == ipv6PodIP { + return &v1.EndpointAddress{ + IP: podIP.IP, + NodeName: &pod.Spec.NodeName, + TargetRef: &v1.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }}, nil + } + } + return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name) +} + func podToEndpointAddress(pod *v1.Pod) *v1.EndpointAddress { return &v1.EndpointAddress{ IP: pod.Status.PodIP, @@ -244,7 +279,9 @@ func podChanged(oldPod, newPod *v1.Pod) bool { return true } // Convert the pod to an EndpointAddress, clear inert fields, - // and see if they are the same. + // and see if they are the same. Even in a dual stack (multi pod IP) a pod + // will never change just one of its IPs, it will always change all. the below + // comparison to check if a pod has changed will still work newEndpointAddress := podToEndpointAddress(newPod) oldEndpointAddress := podToEndpointAddress(oldPod) // Ignore the ResourceVersion because it changes @@ -473,7 +510,14 @@ func (e *EndpointController) syncService(key string) error { continue } - epa := *podToEndpointAddress(pod) + ep, err := podToEndpointAddressForService(service, pod) + if err != nil { + // this will happen, if the cluster runs with some nodes configured as dual stack and some as not + // such as the case of an upgrade.. + klog.V(2).Infof("failed to find endpoint for service:%v with ClusterIP:%v on pod:%v with error:%v", service.Name, service.Spec.ClusterIP, pod.Name, err) + continue + } + epa := *ep hostname := pod.Spec.Hostname if len(hostname) > 0 && pod.Spec.Subdomain == service.Name && service.Namespace == pod.Namespace { diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 6c919626b3e..77683115d0a 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -31,15 +31,18 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" utiltesting "k8s.io/client-go/util/testing" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/api/testapi" endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/features" ) var alwaysReady = func() bool { return true } @@ -49,7 +52,7 @@ var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) var triggerTimeString = triggerTime.Format(time.RFC3339Nano) var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano) -func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod { +func testPod(namespace string, id int, nPorts int, isReady bool, makeDualstack bool) *v1.Pod { p := &v1.Pod{ TypeMeta: metav1.TypeMeta{APIVersion: "v1"}, ObjectMeta: metav1.ObjectMeta{ @@ -77,14 +80,24 @@ func testPod(namespace string, id int, nPorts int, isReady bool) *v1.Pod { p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, v1.ContainerPort{Name: fmt.Sprintf("port%d", j), ContainerPort: int32(8080 + j)}) } + if makeDualstack { + p.Status.PodIPs = []v1.PodIP{ + { + IP: p.Status.PodIP, + }, + { + IP: fmt.Sprintf("2000::%d", id), + }, + } + } return p } -func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { +func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int, makeDualstack bool) { for i := 0; i < nPods+nNotReady; i++ { isReady := i < nPods - pod := testPod(namespace, i, nPorts, isReady) + pod := testPod(namespace, i, nPorts, isReady, makeDualstack) store.Add(pod) } } @@ -289,7 +302,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -330,7 +343,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "UDP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -371,7 +384,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "SCTP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -409,7 +422,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { }, Subsets: []v1.EndpointSubset{}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -446,7 +459,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { }, Subsets: []v1.EndpointSubset{}, }) - addPods(endpoints.podStore, ns, 0, 1, 1) + addPods(endpoints.podStore, ns, 0, 1, 1, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -483,7 +496,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { }, Subsets: []v1.EndpointSubset{}, }) - addPods(endpoints.podStore, ns, 1, 1, 1) + addPods(endpoints.podStore, ns, 1, 1, 1, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -524,7 +537,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -564,7 +577,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0) + addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, Spec: v1.ServiceSpec{ @@ -581,8 +594,9 @@ func TestSyncEndpointsItems(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL, 0*time.Second) - addPods(endpoints.podStore, ns, 3, 2, 0) - addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found! + addPods(endpoints.podStore, ns, 3, 2, 0, false) + addPods(endpoints.podStore, "blah", 5, 2, 0, false) // make sure these aren't found! + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -622,7 +636,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL, 0*time.Second) - addPods(endpoints.podStore, ns, 3, 2, 0) + addPods(endpoints.podStore, ns, 3, 2, 0, false) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -682,7 +696,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ @@ -731,7 +745,8 @@ func TestWaitsForAllInformersToBeSynced2(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns) defer testServer.Close() endpoints := newController(testServer.URL, 0*time.Second) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) + service := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -782,7 +797,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -928,7 +943,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { Ports: nil, }, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 1) data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ @@ -1033,11 +1048,146 @@ func TestShouldPodBeInEndpoints(t *testing.T) { } } } +func TestPodToEndpointAddressForService(t *testing.T) { + testCases := []struct { + name string + expectedEndPointIP string + enableDualStack bool + expectError bool + enableDualStackPod bool + + service v1.Service + }{ + { + name: "v4 service, in a single stack cluster", + expectedEndPointIP: "1.2.3.4", + + enableDualStack: false, + expectError: false, + enableDualStackPod: false, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + }, + }, + }, + { + name: "v4 service, in a dual stack cluster", + + expectedEndPointIP: "1.2.3.4", + enableDualStack: true, + expectError: false, + enableDualStackPod: true, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: "10.0.0.1", + }, + }, + }, + { + name: "v6 service, in a dual stack cluster. dual stack enabled", + expectedEndPointIP: "2000::0", + + enableDualStack: true, + expectError: false, + enableDualStackPod: true, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: "3000::1", + }, + }, + }, + + // in reality this is a misconfigured cluster + // i.e user is not using dual stack and have PodIP == v4 and ServiceIP==v6 + // we are testing that we will keep producing the expected behavior + { + name: "v6 service, in a v4 only cluster. dual stack disabled", + expectedEndPointIP: "1.2.3.4", + + enableDualStack: false, + expectError: false, + enableDualStackPod: false, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: "3000::1", + }, + }, + }, + { + name: "v6 service, in a v4 only cluster - dual stack enabled", + expectedEndPointIP: "1.2.3.4", + + enableDualStack: true, + expectError: true, + enableDualStackPod: false, + + service: v1.Service{ + Spec: v1.ServiceSpec{ + ClusterIP: "3000::1", + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() + podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + ns := "test" + addPods(podStore, ns, 1, 1, 0, tc.enableDualStackPod) + pods := podStore.List() + if len(pods) != 1 { + t.Fatalf("podStore size: expected: %d, got: %d", 1, len(pods)) + } + pod := pods[0].(*v1.Pod) + epa, err := podToEndpointAddressForService(&tc.service, pod) + + if err != nil && !tc.expectError { + t.Fatalf("podToEndpointAddressForService returned unexpected error %v", err) + } + + if err == nil && tc.expectError { + t.Fatalf("podToEndpointAddressForService should have returned error but it did not") + } + + if err != nil && tc.expectError { + return + } + + if epa.IP != tc.expectedEndPointIP { + t.Fatalf("IP: expected: %s, got: %s", pod.Status.PodIP, epa.IP) + } + if *(epa.NodeName) != pod.Spec.NodeName { + t.Fatalf("NodeName: expected: %s, got: %s", pod.Spec.NodeName, *(epa.NodeName)) + } + if epa.TargetRef.Kind != "Pod" { + t.Fatalf("TargetRef.Kind: expected: %s, got: %s", "Pod", epa.TargetRef.Kind) + } + if epa.TargetRef.Namespace != pod.ObjectMeta.Namespace { + t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Namespace, epa.TargetRef.Namespace) + } + if epa.TargetRef.Name != pod.ObjectMeta.Name { + t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.Name, epa.TargetRef.Name) + } + if epa.TargetRef.UID != pod.ObjectMeta.UID { + t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.UID, epa.TargetRef.UID) + } + if epa.TargetRef.ResourceVersion != pod.ObjectMeta.ResourceVersion { + t.Fatalf("TargetRef.Kind: expected: %s, got: %s", pod.ObjectMeta.ResourceVersion, epa.TargetRef.ResourceVersion) + } + }) + } + +} func TestPodToEndpointAddress(t *testing.T) { podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ns := "test" - addPods(podStore, ns, 1, 1, 0) + addPods(podStore, ns, 1, 1, 0, false) pods := podStore.List() if len(pods) != 1 { t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) @@ -1071,7 +1221,7 @@ func TestPodToEndpointAddress(t *testing.T) { func TestPodChanged(t *testing.T) { podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ns := "test" - addPods(podStore, ns, 1, 1, 0) + addPods(podStore, ns, 1, 1, 0, false) pods := podStore.List() if len(pods) != 1 { t.Errorf("podStore size: expected: %d, got: %d", 1, len(pods)) @@ -1102,6 +1252,80 @@ func TestPodChanged(t *testing.T) { } newPod.Status.PodIP = oldPod.Status.PodIP + /* dual stack tests */ + // primary changes, because changing IPs is done by changing sandbox + // case 1: add new secondrary IP + newPod.Status.PodIP = "1.1.3.1" + newPod.Status.PodIPs = []v1.PodIP{ + { + IP: "1.1.3.1", + }, + { + IP: "2000::1", + }, + } + if !podChanged(oldPod, newPod) { + t.Errorf("Expected pod to be changed with adding secondary IP") + } + // reset + newPod.Status.PodIPs = nil + newPod.Status.PodIP = oldPod.Status.PodIP + + // case 2: removing a secondary IP + saved := oldPod.Status.PodIP + oldPod.Status.PodIP = "1.1.3.1" + oldPod.Status.PodIPs = []v1.PodIP{ + { + IP: "1.1.3.1", + }, + { + IP: "2000::1", + }, + } + + newPod.Status.PodIP = "1.2.3.4" + newPod.Status.PodIPs = []v1.PodIP{ + { + IP: "1.2.3.4", + }, + } + + // reset + oldPod.Status.PodIPs = nil + newPod.Status.PodIPs = nil + oldPod.Status.PodIP = saved + newPod.Status.PodIP = saved + // case 3: change secondary + // case 2: removing a secondary IP + saved = oldPod.Status.PodIP + oldPod.Status.PodIP = "1.1.3.1" + oldPod.Status.PodIPs = []v1.PodIP{ + { + IP: "1.1.3.1", + }, + { + IP: "2000::1", + }, + } + + newPod.Status.PodIP = "1.2.3.4" + newPod.Status.PodIPs = []v1.PodIP{ + { + IP: "1.2.3.4", + }, + { + IP: "2000::2", + }, + } + + // reset + oldPod.Status.PodIPs = nil + newPod.Status.PodIPs = nil + oldPod.Status.PodIP = saved + newPod.Status.PodIP = saved + + /* end dual stack testing */ + newPod.ObjectMeta.Name = "wrong-name" if !podChanged(oldPod, newPod) { t.Errorf("Expected pod to be changed with pod name change") @@ -1203,7 +1427,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, Spec: v1.ServiceSpec{ @@ -1250,7 +1474,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}}, }}, }) - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)}, Spec: v1.ServiceSpec{ @@ -1298,7 +1522,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { }}, }) // Neither pod nor service has trigger time, this should cause annotation to be cleared. - addPods(endpoints.podStore, ns, 1, 1, 0) + addPods(endpoints.podStore, ns, 1, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ @@ -1435,7 +1659,7 @@ func TestPodUpdatesBatching(t *testing.T) { go endpoints.Run(1, stopCh) - addPods(endpoints.podStore, ns, tc.podsCount, 1, 0) + addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, @@ -1568,7 +1792,7 @@ func TestPodAddsBatching(t *testing.T) { for i, add := range tc.adds { time.Sleep(add.delay) - p := testPod(ns, i, 1, true) + p := testPod(ns, i, 1, true, false) endpoints.podStore.Add(p) endpoints.addPod(p) } @@ -1679,7 +1903,7 @@ func TestPodDeleteBatching(t *testing.T) { go endpoints.Run(1, stopCh) - addPods(endpoints.podStore, ns, tc.podsCount, 1, 0) + addPods(endpoints.podStore, ns, tc.podsCount, 1, 0, false) endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 3e90b84f692..36307def915 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -55,9 +55,12 @@ type Controller struct { EventClient corev1client.EventsGetter healthClient rest.Interface - ServiceClusterIPRegistry rangeallocation.RangeRegistry + ServiceClusterIPRegistry rangeallocation.RangeRegistry + ServiceClusterIPRange net.IPNet + SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry + SecondaryServiceClusterIPRange net.IPNet + ServiceClusterIPInterval time.Duration - ServiceClusterIPRange net.IPNet ServiceNodePortRegistry rangeallocation.RangeRegistry ServiceNodePortInterval time.Duration @@ -106,8 +109,11 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega SystemNamespaces: systemNamespaces, SystemNamespacesInterval: 1 * time.Minute, - ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator, - ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange, + ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator, + ServiceClusterIPRange: c.ExtraConfig.ServiceIPRange, + SecondaryServiceClusterIPRegistry: legacyRESTStorage.SecondaryServiceClusterIPAllocator, + SecondaryServiceClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange, + ServiceClusterIPInterval: 3 * time.Minute, ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator, @@ -148,7 +154,7 @@ func (c *Controller) Start() { klog.Errorf("Unable to remove old endpoints from kubernetes service: %v", err) } - repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry) + repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) // run all of the controllers once prior to returning from Start. diff --git a/pkg/master/master.go b/pkg/master/master.go index 9806579334a..9443f647ec8 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -132,6 +132,13 @@ type ExtraConfig struct { ServiceIPRange net.IPNet // The IP address for the GenericAPIServer service (must be inside ServiceIPRange) APIServerServiceIP net.IP + + // dual stack services, the range represents an alternative IP range for service IP + // must be of different family than primary (ServiceIPRange) + SecondaryServiceIPRange net.IPNet + // the secondary IP address the GenericAPIServer service (must be inside SecondaryServiceIPRange) + SecondaryAPIServerServiceIP net.IP + // Port for the apiserver service. APIServerServicePort int @@ -323,6 +330,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) KubeletClientConfig: c.ExtraConfig.KubeletClientConfig, EventTTL: c.ExtraConfig.EventTTL, ServiceIPRange: c.ExtraConfig.ServiceIPRange, + SecondaryServiceIPRange: c.ExtraConfig.SecondaryServiceIPRange, ServiceNodePortRange: c.ExtraConfig.ServiceNodePortRange, LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig, ServiceAccountIssuer: c.ExtraConfig.ServiceAccountIssuer, diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 6a00feae183..9ece81f5602 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -77,8 +77,10 @@ type LegacyRESTStorageProvider struct { EventTTL time.Duration // ServiceIPRange is used to build cluster IPs for discovery. - ServiceIPRange net.IPNet - ServiceNodePortRange utilnet.PortRange + ServiceIPRange net.IPNet + // allocates ips for secondary service cidr in dual stack clusters + SecondaryServiceIPRange net.IPNet + ServiceNodePortRange utilnet.PortRange ServiceAccountIssuer serviceaccount.TokenGenerator ServiceAccountMaxExpiration time.Duration @@ -92,8 +94,9 @@ type LegacyRESTStorageProvider struct { // master.go for wiring controllers. // TODO remove this by running the controller as a poststarthook type LegacyRESTStorage struct { - ServiceClusterIPAllocator rangeallocation.RangeRegistry - ServiceNodePortAllocator rangeallocation.RangeRegistry + ServiceClusterIPAllocator rangeallocation.RangeRegistry + SecondaryServiceClusterIPAllocator rangeallocation.RangeRegistry + ServiceNodePortAllocator rangeallocation.RangeRegistry } func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { @@ -216,6 +219,26 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi } restStorage.ServiceClusterIPAllocator = serviceClusterIPRegistry + // allocator for secondary service ip range + var secondaryServiceClusterIPAllocator ipallocator.Interface + if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && c.SecondaryServiceIPRange.IP != nil { + var secondaryServiceClusterIPRegistry rangeallocation.RangeRegistry + secondaryServiceClusterIPAllocator, err = ipallocator.NewAllocatorCIDRRange(&c.SecondaryServiceIPRange, func(max int, rangeSpec string) (allocator.Interface, error) { + mem := allocator.NewAllocationMap(max, rangeSpec) + // TODO etcdallocator package to return a storage interface via the storageFactory + etcd, err := serviceallocator.NewEtcd(mem, "/ranges/secondaryserviceips", api.Resource("serviceipallocations"), serviceStorageConfig) + if err != nil { + return nil, err + } + secondaryServiceClusterIPRegistry = etcd + return etcd, nil + }) + if err != nil { + return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, fmt.Errorf("cannot create cluster secondary IP allocator: %v", err) + } + restStorage.SecondaryServiceClusterIPAllocator = secondaryServiceClusterIPRegistry + } + var serviceNodePortRegistry rangeallocation.RangeRegistry serviceNodePortAllocator, err := portallocator.NewPortAllocatorCustom(c.ServiceNodePortRange, func(max int, rangeSpec string) (allocator.Interface, error) { mem := allocator.NewAllocationMap(max, rangeSpec) @@ -237,7 +260,13 @@ func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generi return LegacyRESTStorage{}, genericapiserver.APIGroupInfo{}, err } - serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage, endpointsStorage, podStorage.Pod, serviceClusterIPAllocator, serviceNodePortAllocator, c.ProxyTransport) + serviceRest, serviceRestProxy := servicestore.NewREST(serviceRESTStorage, + endpointsStorage, + podStorage.Pod, + serviceClusterIPAllocator, + secondaryServiceClusterIPAllocator, + serviceNodePortAllocator, + c.ProxyTransport) restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, diff --git a/pkg/registry/core/service/ipallocator/allocator.go b/pkg/registry/core/service/ipallocator/allocator.go index 37c5741078e..722d299683c 100644 --- a/pkg/registry/core/service/ipallocator/allocator.go +++ b/pkg/registry/core/service/ipallocator/allocator.go @@ -32,6 +32,7 @@ type Interface interface { AllocateNext() (net.IP, error) Release(net.IP) error ForEach(func(net.IP)) + CIDR() net.IPNet // For testing Has(ip net.IP) bool diff --git a/pkg/registry/core/service/ipallocator/allocator_test.go b/pkg/registry/core/service/ipallocator/allocator_test.go index 5174ef6fcd0..e821a6a1016 100644 --- a/pkg/registry/core/service/ipallocator/allocator_test.go +++ b/pkg/registry/core/service/ipallocator/allocator_test.go @@ -69,6 +69,12 @@ func TestAllocate(t *testing.T) { if f := r.Free(); f != tc.free { t.Errorf("Test %s unexpected free %d", tc.name, f) } + + rCIDR := r.CIDR() + if rCIDR.String() != tc.cidr { + t.Errorf("allocator returned a different cidr") + } + if f := r.Used(); f != 0 { t.Errorf("Test %s unexpected used %d", tc.name, f) } diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index b2cb50d8136..e89af254121 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -34,6 +34,10 @@ import ( "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + netutil "k8s.io/utils/net" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) // Repair is a controller loop that periodically examines all service ClusterIP allocations @@ -54,10 +58,14 @@ import ( type Repair struct { interval time.Duration serviceClient corev1client.ServicesGetter - network *net.IPNet - alloc rangeallocation.RangeRegistry - leaks map[string]int // counter per leaked IP - recorder record.EventRecorder + + network *net.IPNet + alloc rangeallocation.RangeRegistry + secondaryNetwork *net.IPNet + secondaryAlloc rangeallocation.RangeRegistry + + leaks map[string]int // counter per leaked IP + recorder record.EventRecorder } // How many times we need to detect a leak before we clean up. This is to @@ -66,7 +74,7 @@ const numRepairsBeforeLeakCleanup = 3 // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { +func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")}) recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"}) @@ -74,10 +82,14 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter return &Repair{ interval: interval, serviceClient: serviceClient, - network: network, - alloc: alloc, - leaks: map[string]int{}, - recorder: recorder, + + network: network, + alloc: alloc, + secondaryNetwork: secondaryNetwork, + secondaryAlloc: secondaryAlloc, + + leaks: map[string]int{}, + recorder: recorder, } } @@ -95,6 +107,29 @@ func (c *Repair) RunOnce() error { return retry.RetryOnConflict(retry.DefaultBackoff, c.runOnce) } +// selectAllocForIP returns an allocator for an IP based weather it belongs to the primary or the secondary allocator +func (c *Repair) selectAllocForIP(ip net.IP, primary ipallocator.Interface, secondary ipallocator.Interface) ipallocator.Interface { + if !c.shouldWorkOnSecondary() { + return primary + } + + cidr := secondary.CIDR() + if netutil.IsIPv6CIDR(&cidr) && netutil.IsIPv6(ip) { + return secondary + } + + return primary +} + +// shouldWorkOnSecondary returns true if the repairer should perform work for secondary network (dual stack) +func (c *Repair) shouldWorkOnSecondary() bool { + if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { + return false + } + + return c.secondaryNetwork != nil && c.secondaryNetwork.IP != nil +} + // runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. func (c *Repair) runOnce() error { // TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read, @@ -107,10 +142,26 @@ func (c *Repair) runOnce() error { // If etcd server is not running we should wait for some time and fail only then. This is particularly // important when we start apiserver and etcd at the same time. var snapshot *api.RangeAllocation - err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + var secondarySnapshot *api.RangeAllocation + + var stored, secondaryStored ipallocator.Interface + var err, secondaryErr error + + err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { var err error snapshot, err = c.alloc.Get() - return err == nil, err + if err != nil { + return false, err + } + + if c.shouldWorkOnSecondary() { + secondarySnapshot, err = c.secondaryAlloc.Get() + if err != nil { + return false, err + } + } + + return true, nil }) if err != nil { return fmt.Errorf("unable to refresh the service IP block: %v", err) @@ -119,10 +170,19 @@ func (c *Repair) runOnce() error { if snapshot.Range == "" { snapshot.Range = c.network.String() } + + if c.shouldWorkOnSecondary() && secondarySnapshot.Range == "" { + secondarySnapshot.Range = c.secondaryNetwork.String() + } // Create an allocator because it is easy to use. - stored, err := ipallocator.NewFromSnapshot(snapshot) - if err != nil { - return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err) + + stored, err = ipallocator.NewFromSnapshot(snapshot) + if c.shouldWorkOnSecondary() { + secondaryStored, secondaryErr = ipallocator.NewFromSnapshot(secondarySnapshot) + } + + if err != nil || secondaryErr != nil { + return fmt.Errorf("unable to rebuild allocator from snapshots: %v", err) } // We explicitly send no resource version, since the resource version @@ -135,10 +195,20 @@ func (c *Repair) runOnce() error { return fmt.Errorf("unable to refresh the service IP block: %v", err) } - rebuilt, err := ipallocator.NewCIDRRange(c.network) + var rebuilt, secondaryRebuilt *ipallocator.Range + rebuilt, err = ipallocator.NewCIDRRange(c.network) if err != nil { return fmt.Errorf("unable to create CIDR range: %v", err) } + + if c.shouldWorkOnSecondary() { + secondaryRebuilt, err = ipallocator.NewCIDRRange(c.secondaryNetwork) + } + + if err != nil { + return fmt.Errorf("unable to create CIDR range: %v", err) + } + // Check every Service's ClusterIP, and rebuild the state as we think it should be. for _, svc := range list.Items { if !helper.IsServiceIPSet(&svc) { @@ -152,12 +222,15 @@ func (c *Repair) runOnce() error { runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) continue } + // mark it as in-use - switch err := rebuilt.Allocate(ip); err { + actualAlloc := c.selectAllocForIP(ip, rebuilt, secondaryRebuilt) + switch err := actualAlloc.Allocate(ip); err { case nil: - if stored.Has(ip) { + actualStored := c.selectAllocForIP(ip, stored, secondaryStored) + if actualStored.Has(ip) { // remove it from the old set, so we can find leaks - stored.Release(ip) + actualStored.Release(ip) } else { // cluster IP doesn't seem to be allocated c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP %s is not allocated; repairing", ip) @@ -174,14 +247,50 @@ func (c *Repair) runOnce() error { runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) case ipallocator.ErrFull: // somehow we are out of IPs - c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %s is full; you must widen the CIDR in order to create new services", c.network) - return fmt.Errorf("the service CIDR %s is full; you must widen the CIDR in order to create new services", c.network) + cidr := actualAlloc.CIDR() + c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %v is full; you must widen the CIDR in order to create new services", cidr) + return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", cidr) default: c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP %s due to an unknown error", ip) return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err) } } + c.checkLeaked(stored, rebuilt) + if c.shouldWorkOnSecondary() { + c.checkLeaked(secondaryStored, secondaryRebuilt) + } + + // Blast the rebuilt state into storage. + err = c.saveSnapShot(rebuilt, c.alloc, snapshot) + if err != nil { + return err + } + + if c.shouldWorkOnSecondary() { + err := c.saveSnapShot(secondaryRebuilt, c.secondaryAlloc, secondarySnapshot) + if err != nil { + return nil + } + } + return nil +} + +func (c *Repair) saveSnapShot(rebuilt *ipallocator.Range, alloc rangeallocation.RangeRegistry, snapshot *api.RangeAllocation) error { + if err := rebuilt.Snapshot(snapshot); err != nil { + return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err) + } + if err := alloc.CreateOrUpdate(snapshot); err != nil { + if errors.IsConflict(err) { + return err + } + return fmt.Errorf("unable to persist the updated service IP allocations: %v", err) + } + + return nil +} + +func (c *Repair) checkLeaked(stored ipallocator.Interface, rebuilt *ipallocator.Range) { // Check for IPs that are left in the old set. They appear to have been leaked. stored.ForEach(func(ip net.IP) { count, found := c.leaks[ip.String()] @@ -203,15 +312,4 @@ func (c *Repair) runOnce() error { } }) - // Blast the rebuilt state into storage. - if err := rebuilt.Snapshot(snapshot); err != nil { - return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err) - } - if err := c.alloc.CreateOrUpdate(snapshot); err != nil { - if errors.IsConflict(err) { - return err - } - return fmt.Errorf("unable to persist the updated service IP allocations: %v", err) - } - return nil } diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index 26992176f6c..1b24bde17c0 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -27,6 +27,10 @@ import ( "k8s.io/client-go/kubernetes/fake" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" ) type mockRangeRegistry struct { @@ -56,7 +60,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "192.168.1.0/24"}, } _, cidr, _ := net.ParseCIDR(ipregistry.item.Range) - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) if err := r.RunOnce(); err != nil { t.Fatal(err) @@ -69,7 +73,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "192.168.1.0/24"}, updateErr: fmt.Errorf("test error"), } - r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry) + r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } @@ -100,7 +104,7 @@ func TestRepairLeak(t *testing.T) { }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { if err := r.RunOnce(); err != nil { @@ -176,7 +180,7 @@ func TestRepairWithExisting(t *testing.T) { Data: dst.Data, }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) if err := r.RunOnce(); err != nil { t.Fatal(err) } @@ -191,3 +195,342 @@ func TestRepairWithExisting(t *testing.T) { t.Errorf("unexpected ipallocator state: %d free", free) } } + +func makeRangeRegistry(t *testing.T, cidrRange string) *mockRangeRegistry { + _, cidr, _ := net.ParseCIDR(cidrRange) + previous, err := ipallocator.NewCIDRRange(cidr) + if err != nil { + t.Fatal(err) + } + + var dst api.RangeAllocation + err = previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + return &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + Range: dst.Range, + Data: dst.Data, + }, + } +} + +func makeFakeClientSet() *fake.Clientset { + return fake.NewSimpleClientset() +} +func makeIPNet(cidr string) *net.IPNet { + _, net, _ := net.ParseCIDR(cidr) + return net +} +func TestShouldWorkOnSecondary(t *testing.T) { + testCases := []struct { + name string + enableDualStack bool + expectedResult bool + primaryNet *net.IPNet + secondaryNet *net.IPNet + }{ + { + name: "not a dual stack, primary only", + enableDualStack: false, + expectedResult: false, + primaryNet: makeIPNet("10.0.0.0/16"), + secondaryNet: nil, + }, + { + name: "not a dual stack, primary and secondary provided", + enableDualStack: false, + expectedResult: false, + primaryNet: makeIPNet("10.0.0.0/16"), + secondaryNet: makeIPNet("2000::/120"), + }, + { + name: "dual stack, primary only", + enableDualStack: true, + expectedResult: false, + primaryNet: makeIPNet("10.0.0.0/16"), + secondaryNet: nil, + }, + { + name: "dual stack, primary and secondary", + enableDualStack: true, + expectedResult: true, + primaryNet: makeIPNet("10.0.0.0/16"), + secondaryNet: makeIPNet("2000::/120"), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() + + fakeClient := makeFakeClientSet() + primaryRegistry := makeRangeRegistry(t, tc.primaryNet.String()) + var secondaryRegistery *mockRangeRegistry + + if tc.secondaryNet != nil { + secondaryRegistery = makeRangeRegistry(t, tc.secondaryNet.String()) + } + + repair := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), tc.primaryNet, primaryRegistry, tc.secondaryNet, secondaryRegistery) + if repair.shouldWorkOnSecondary() != tc.expectedResult { + t.Errorf("shouldWorkOnSecondary should be %v and found %v", tc.expectedResult, repair.shouldWorkOnSecondary()) + } + }) + } +} + +func TestRepairDualStack(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)() + + fakeClient := fake.NewSimpleClientset() + ipregistry := &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "192.168.1.0/24"}, + } + secondaryIPRegistry := &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "2000::/108"}, + } + + _, cidr, _ := net.ParseCIDR(ipregistry.item.Range) + _, secondaryCIDR, _ := net.ParseCIDR(secondaryIPRegistry.item.Range) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + if !ipregistry.updateCalled || ipregistry.updated == nil || ipregistry.updated.Range != cidr.String() || ipregistry.updated != ipregistry.item { + t.Errorf("unexpected ipregistry: %#v", ipregistry) + } + if !secondaryIPRegistry.updateCalled || secondaryIPRegistry.updated == nil || secondaryIPRegistry.updated.Range != secondaryCIDR.String() || secondaryIPRegistry.updated != secondaryIPRegistry.item { + t.Errorf("unexpected ipregistry: %#v", ipregistry) + } + + ipregistry = &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "192.168.1.0/24"}, + updateErr: fmt.Errorf("test error"), + } + secondaryIPRegistry = &mockRangeRegistry{ + item: &api.RangeAllocation{Range: "2000::/108"}, + updateErr: fmt.Errorf("test error"), + } + + r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { + t.Fatal(err) + } +} + +func TestRepairLeakDualStack(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)() + + _, cidr, _ := net.ParseCIDR("192.168.1.0/24") + previous, err := ipallocator.NewCIDRRange(cidr) + if err != nil { + t.Fatal(err) + } + + previous.Allocate(net.ParseIP("192.168.1.10")) + + _, secondaryCIDR, _ := net.ParseCIDR("2000::/108") + secondaryPrevious, err := ipallocator.NewCIDRRange(secondaryCIDR) + if err != nil { + t.Fatal(err) + } + secondaryPrevious.Allocate(net.ParseIP("2000::1")) + + var dst api.RangeAllocation + err = previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + var secondaryDST api.RangeAllocation + err = secondaryPrevious.Snapshot(&secondaryDST) + if err != nil { + t.Fatal(err) + } + + fakeClient := fake.NewSimpleClientset() + + ipregistry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + Range: dst.Range, + Data: dst.Data, + }, + } + secondaryIPRegistry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + Range: secondaryDST.Range, + Data: secondaryDST.Data, + }, + } + + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + // Run through the "leak detection holdoff" loops. + for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { + t.Fatal(err) + } + if !after.Has(net.ParseIP("192.168.1.10")) { + t.Errorf("expected ipallocator to still have leaked IP") + } + secondaryAfter, err := ipallocator.NewFromSnapshot(secondaryIPRegistry.updated) + if err != nil { + t.Fatal(err) + } + if !secondaryAfter.Has(net.ParseIP("2000::1")) { + t.Errorf("expected ipallocator to still have leaked IP") + } + } + // Run one more time to actually remove the leak. + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { + t.Fatal(err) + } + if after.Has(net.ParseIP("192.168.1.10")) { + t.Errorf("expected ipallocator to not have leaked IP") + } + secondaryAfter, err := ipallocator.NewFromSnapshot(secondaryIPRegistry.updated) + if err != nil { + t.Fatal(err) + } + if secondaryAfter.Has(net.ParseIP("2000::1")) { + t.Errorf("expected ipallocator to not have leaked IP") + } +} + +func TestRepairWithExistingDualStack(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, true)() + _, cidr, _ := net.ParseCIDR("192.168.1.0/24") + previous, err := ipallocator.NewCIDRRange(cidr) + if err != nil { + t.Fatal(err) + } + + _, secondaryCIDR, _ := net.ParseCIDR("2000::/108") + secondaryPrevious, err := ipallocator.NewCIDRRange(secondaryCIDR) + if err != nil { + t.Fatal(err) + } + + var dst api.RangeAllocation + err = previous.Snapshot(&dst) + if err != nil { + t.Fatal(err) + } + + var secondaryDST api.RangeAllocation + err = secondaryPrevious.Snapshot(&secondaryDST) + if err != nil { + t.Fatal(err) + } + + fakeClient := fake.NewSimpleClientset( + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "one", Name: "one-v6"}, + Spec: corev1.ServiceSpec{ClusterIP: "2000::1"}, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.100"}, + }, + &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "two", Name: "two-6"}, + Spec: corev1.ServiceSpec{ClusterIP: "2000::2"}, + }, + &corev1.Service{ // outside CIDR, will be dropped + ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.0.1"}, + }, + &corev1.Service{ // outside CIDR, will be dropped + ObjectMeta: metav1.ObjectMeta{Namespace: "three", Name: "three-v6"}, + Spec: corev1.ServiceSpec{ClusterIP: "3000::1"}, + }, + &corev1.Service{ // empty, ignored + ObjectMeta: metav1.ObjectMeta{Namespace: "four", Name: "four"}, + Spec: corev1.ServiceSpec{ClusterIP: ""}, + }, + &corev1.Service{ // duplicate, dropped + ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five"}, + Spec: corev1.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + &corev1.Service{ // duplicate, dropped + ObjectMeta: metav1.ObjectMeta{Namespace: "five", Name: "five-v6"}, + Spec: corev1.ServiceSpec{ClusterIP: "2000::2"}, + }, + + &corev1.Service{ // headless + ObjectMeta: metav1.ObjectMeta{Namespace: "six", Name: "six"}, + Spec: corev1.ServiceSpec{ClusterIP: "None"}, + }, + ) + + ipregistry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + Range: dst.Range, + Data: dst.Data, + }, + } + + secondaryIPRegistry := &mockRangeRegistry{ + item: &api.RangeAllocation{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "1", + }, + Range: secondaryDST.Range, + Data: secondaryDST.Data, + }, + } + + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + if err := r.RunOnce(); err != nil { + t.Fatal(err) + } + after, err := ipallocator.NewFromSnapshot(ipregistry.updated) + if err != nil { + t.Fatal(err) + } + + if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) { + t.Errorf("unexpected ipallocator state: %#v", after) + } + if free := after.Free(); free != 252 { + t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 252)", free) + } + secondaryAfter, err := ipallocator.NewFromSnapshot(secondaryIPRegistry.updated) + if err != nil { + t.Fatal(err) + } + if !secondaryAfter.Has(net.ParseIP("2000::1")) || !secondaryAfter.Has(net.ParseIP("2000::2")) { + t.Errorf("unexpected ipallocator state: %#v", secondaryAfter) + } + if free := secondaryAfter.Free(); free != 65532 { + t.Errorf("unexpected ipallocator state: %d free (number of free ips is not 65532)", free) + } + +} diff --git a/pkg/registry/core/service/storage/rest.go b/pkg/registry/core/service/storage/rest.go index 6743b316d4b..c45af066f15 100644 --- a/pkg/registry/core/service/storage/rest.go +++ b/pkg/registry/core/service/storage/rest.go @@ -46,16 +46,22 @@ import ( registry "k8s.io/kubernetes/pkg/registry/core/service" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" + netutil "k8s.io/utils/net" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/kubernetes/pkg/features" ) // REST adapts a service registry into apiserver's RESTStorage model. type REST struct { - services ServiceStorage - endpoints EndpointsStorage - serviceIPs ipallocator.Interface - serviceNodePorts portallocator.Interface - proxyTransport http.RoundTripper - pods rest.Getter + services ServiceStorage + endpoints EndpointsStorage + serviceIPs ipallocator.Interface + secondaryServiceIPs ipallocator.Interface + defaultServiceIPFamily api.IPFamily + serviceNodePorts portallocator.Interface + proxyTransport http.RoundTripper + pods rest.Getter } // ServiceNodePort includes protocol and port number of a service NodePort. @@ -94,16 +100,29 @@ func NewREST( endpoints EndpointsStorage, pods rest.Getter, serviceIPs ipallocator.Interface, + secondaryServiceIPs ipallocator.Interface, serviceNodePorts portallocator.Interface, proxyTransport http.RoundTripper, ) (*REST, *registry.ProxyREST) { + // detect this cluster default Service IPFamily (ipfamily of --service-cluster-ip-range) + // we do it once here, to avoid having to do it over and over during ipfamily assignment + serviceIPFamily := api.IPv4Protocol + cidr := serviceIPs.CIDR() + if netutil.IsIPv6CIDR(&cidr) { + serviceIPFamily = api.IPv6Protocol + } + + klog.V(0).Infof("the default service ipfamily for this cluster is: %s", string(serviceIPFamily)) + rest := &REST{ - services: services, - endpoints: endpoints, - serviceIPs: serviceIPs, - serviceNodePorts: serviceNodePorts, - proxyTransport: proxyTransport, - pods: pods, + services: services, + endpoints: endpoints, + serviceIPs: serviceIPs, + secondaryServiceIPs: secondaryServiceIPs, + serviceNodePorts: serviceNodePorts, + defaultServiceIPFamily: serviceIPFamily, + proxyTransport: proxyTransport, + pods: pods, } return rest, ®istry.ProxyREST{Redirector: rest, ProxyTransport: proxyTransport} } @@ -160,6 +179,11 @@ func (rs *REST) Export(ctx context.Context, name string, opts metav1.ExportOptio func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { service := obj.(*api.Service) + // set the service ip family, if it was not already set + if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) && service.Spec.IPFamily == nil { + service.Spec.IPFamily = &rs.defaultServiceIPFamily + } + if err := rest.BeforeCreate(registry.Strategy, ctx, obj); err != nil { return nil, err } @@ -169,7 +193,8 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation defer func() { if releaseServiceIP { if helper.IsServiceIPSet(service) { - rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP)) + allocator := rs.getAllocatorByClusterIP(service) + allocator.Release(net.ParseIP(service.Spec.ClusterIP)) } } }() @@ -177,7 +202,8 @@ func (rs *REST) Create(ctx context.Context, obj runtime.Object, createValidation var err error if !dryrun.IsDryRun(options.DryRun) { if service.Spec.Type != api.ServiceTypeExternalName { - if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil { + allocator := rs.getAllocatorBySpec(service) + if releaseServiceIP, err = initClusterIP(service, allocator); err != nil { return nil, err } } @@ -256,7 +282,8 @@ func (rs *REST) Delete(ctx context.Context, id string, deleteValidation rest.Val func (rs *REST) releaseAllocatedResources(svc *api.Service) { if helper.IsServiceIPSet(svc) { - rs.serviceIPs.Release(net.ParseIP(svc.Spec.ClusterIP)) + allocator := rs.getAllocatorByClusterIP(svc) + allocator.Release(net.ParseIP(svc.Spec.ClusterIP)) } for _, nodePort := range collectServiceNodePorts(svc) { @@ -365,6 +392,7 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj } service := obj.(*api.Service) + if !rest.ValidNamespace(ctx, &service.ObjectMeta) { return nil, false, errors.NewConflict(api.Resource("services"), service.Namespace, fmt.Errorf("Service.Namespace does not match the provided context")) } @@ -379,7 +407,8 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj defer func() { if releaseServiceIP { if helper.IsServiceIPSet(service) { - rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP)) + allocator := rs.getAllocatorByClusterIP(service) + allocator.Release(net.ParseIP(service.Spec.ClusterIP)) } } }() @@ -389,15 +418,19 @@ func (rs *REST) Update(ctx context.Context, name string, objInfo rest.UpdatedObj if !dryrun.IsDryRun(options.DryRun) { // Update service from ExternalName to non-ExternalName, should initialize ClusterIP. + // Since we don't support changing the ip family of a service we don't need to handle + // oldService.Spec.ServiceIPFamily != service.Spec.ServiceIPFamily if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName { - if releaseServiceIP, err = initClusterIP(service, rs.serviceIPs); err != nil { + allocator := rs.getAllocatorBySpec(service) + if releaseServiceIP, err = initClusterIP(service, allocator); err != nil { return nil, false, err } } // Update service from non-ExternalName to ExternalName, should release ClusterIP if exists. if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName { if helper.IsServiceIPSet(oldService) { - rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP)) + allocator := rs.getAllocatorByClusterIP(service) + allocator.Release(net.ParseIP(oldService.Spec.ClusterIP)) } } } @@ -521,6 +554,35 @@ func (r *REST) ConvertToTable(ctx context.Context, object runtime.Object, tableO return r.services.ConvertToTable(ctx, object, tableOptions) } +// When allocating we always use BySpec, when releasing we always use ByClusterIP +func (r *REST) getAllocatorByClusterIP(service *api.Service) ipallocator.Interface { + if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) || r.secondaryServiceIPs == nil { + return r.serviceIPs + } + + secondaryAllocatorCIDR := r.secondaryServiceIPs.CIDR() + if netutil.IsIPv6String(service.Spec.ClusterIP) && netutil.IsIPv6CIDR(&secondaryAllocatorCIDR) { + return r.secondaryServiceIPs + } + + return r.serviceIPs +} + +func (r *REST) getAllocatorBySpec(service *api.Service) ipallocator.Interface { + if !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) || + service.Spec.IPFamily == nil || + r.secondaryServiceIPs == nil { + return r.serviceIPs + } + + secondaryAllocatorCIDR := r.secondaryServiceIPs.CIDR() + if *(service.Spec.IPFamily) == api.IPv6Protocol && netutil.IsIPv6CIDR(&secondaryAllocatorCIDR) { + return r.secondaryServiceIPs + } + + return r.serviceIPs +} + func isValidAddress(ctx context.Context, addr *api.EndpointAddress, pods rest.Getter) error { if addr.TargetRef == nil { return fmt.Errorf("Address has no target ref, skipping: %v", addr) @@ -603,11 +665,11 @@ func allocateHealthCheckNodePort(service *api.Service, nodePortOp *portallocator } // The return bool value indicates if a cluster IP is allocated successfully. -func initClusterIP(service *api.Service, serviceIPs ipallocator.Interface) (bool, error) { +func initClusterIP(service *api.Service, allocator ipallocator.Interface) (bool, error) { switch { case service.Spec.ClusterIP == "": // Allocate next available. - ip, err := serviceIPs.AllocateNext() + ip, err := allocator.AllocateNext() if err != nil { // TODO: what error should be returned here? It's not a // field-level validation failure (the field is valid), and it's @@ -618,7 +680,7 @@ func initClusterIP(service *api.Service, serviceIPs ipallocator.Interface) (bool return true, nil case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "": // Try to respect the requested IP. - if err := serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil { + if err := allocator.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil { // TODO: when validation becomes versioned, this gets more complicated. el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())} return false, errors.NewInvalid(api.Kind("Service"), service.Name, el) diff --git a/pkg/registry/core/service/storage/rest_test.go b/pkg/registry/core/service/storage/rest_test.go index c92d1502244..3a2abfd05c4 100644 --- a/pkg/registry/core/service/storage/rest_test.go +++ b/pkg/registry/core/service/storage/rest_test.go @@ -17,6 +17,7 @@ limitations under the License. package storage import ( + "bytes" "context" "net" "reflect" @@ -41,12 +42,15 @@ import ( "k8s.io/kubernetes/pkg/api/service" api "k8s.io/kubernetes/pkg/apis/core" - "k8s.io/kubernetes/pkg/apis/core/helper" endpointstore "k8s.io/kubernetes/pkg/registry/core/endpoint/storage" podstore "k8s.io/kubernetes/pkg/registry/core/pod/storage" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/portallocator" "k8s.io/kubernetes/pkg/registry/registrytest" + + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" ) // TODO(wojtek-t): Cleanup this file. @@ -167,11 +171,11 @@ func generateRandomNodePort() int32 { return int32(rand.IntnRange(30001, 30999)) } -func NewTestREST(t *testing.T, endpoints *api.EndpointsList) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) { - return NewTestRESTWithPods(t, endpoints, nil) +func NewTestREST(t *testing.T, endpoints *api.EndpointsList, dualStack bool) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) { + return NewTestRESTWithPods(t, endpoints, nil, dualStack) } -func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) { +func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.PodList, dualStack bool) (*REST, *serviceStorage, *etcd3testing.EtcdTestServer) { etcdStorage, server := registrytest.NewEtcdStorage(t, "") serviceStorage := &serviceStorage{} @@ -216,6 +220,13 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P if err != nil { t.Fatalf("cannot create CIDR Range %v", err) } + var rSecondary ipallocator.Interface + if dualStack { + rSecondary, err = ipallocator.NewCIDRRange(makeIPNet6(t)) + if err != nil { + t.Fatalf("cannot create CIDR Range(secondary) %v", err) + } + } portRange := utilnet.PortRange{Base: 30000, Size: 1000} portAllocator, err := portallocator.NewPortAllocator(portRange) @@ -223,7 +234,7 @@ func NewTestRESTWithPods(t *testing.T, endpoints *api.EndpointsList, pods *api.P t.Fatalf("cannot create port allocator %v", err) } - rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, portAllocator, nil) + rest, _ := NewREST(serviceStorage, endpointStorage, podStorage.Pod, r, rSecondary, portAllocator, nil) return rest, serviceStorage, server } @@ -235,6 +246,27 @@ func makeIPNet(t *testing.T) *net.IPNet { } return net } +func makeIPNet6(t *testing.T) *net.IPNet { + _, net, err := net.ParseCIDR("2000::/108") + if err != nil { + t.Error(err) + } + return net +} + +func ipnetGet(t *testing.T, secondary bool) *net.IPNet { + if secondary { + return makeIPNet6(t) + } + return makeIPNet(t) +} + +func allocGet(r *REST, secondary bool) ipallocator.Interface { + if secondary { + return r.secondaryServiceIPs + } + return r.serviceIPs +} func releaseServiceNodePorts(t *testing.T, ctx context.Context, svcName string, rest *REST, registry ServiceStorage) { obj, err := registry.Get(ctx, svcName, &metav1.GetOptions{}) @@ -256,90 +288,214 @@ func releaseServiceNodePorts(t *testing.T, ctx context.Context, svcName string, } func TestServiceRegistryCreate(t *testing.T) { - storage, registry, server := NewTestREST(t, nil) - defer server.Terminate(t) + ipv4Service := api.IPv4Protocol + ipv6Service := api.IPv6Protocol - svc := &api.Service{ - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, - Ports: []api.ServicePort{{ - Port: 6502, - Protocol: api.ProtocolTCP, - TargetPort: intstr.FromInt(6502), - }}, + testCases := []struct { + svc *api.Service + name string + enableDualStack bool + useSecondary bool + }{ + { + name: "Service IPFamily default cluster dualstack:off", + enableDualStack: false, + useSecondary: false, + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, + }, + { + name: "Service IPFamily:v4 dualstack off", + enableDualStack: false, + useSecondary: false, + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv4Service, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, + }, + { + name: "Service IPFamily:v4 dualstack on", + enableDualStack: true, + useSecondary: false, + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv4Service, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, + }, + { + name: "Service IPFamily:v6 dualstack on", + enableDualStack: true, + useSecondary: true, + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv6Service, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, }, } - ctx := genericapirequest.NewDefaultContext() - created_svc, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - created_service := created_svc.(*api.Service) - objMeta, err := meta.Accessor(created_service) - if err != nil { - t.Fatal(err) - } - if !metav1.HasObjectMetaSystemFieldValues(objMeta) { - t.Errorf("storage did not populate object meta field values") - } - if created_service.Name != "foo" { - t.Errorf("Expected foo, but got %v", created_service.Name) - } - if created_service.CreationTimestamp.IsZero() { - t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp) - } - if !makeIPNet(t).Contains(net.ParseIP(created_service.Spec.ClusterIP)) { - t.Errorf("Unexpected ClusterIP: %s", created_service.Spec.ClusterIP) - } - srv, err := registry.GetService(ctx, svc.Name, &metav1.GetOptions{}) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if srv == nil { - t.Errorf("Failed to find service: %s", svc.Name) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() + storage, registry, server := NewTestREST(t, nil, tc.enableDualStack) + defer server.Terminate(t) + + ctx := genericapirequest.NewDefaultContext() + created_svc, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + created_service := created_svc.(*api.Service) + objMeta, err := meta.Accessor(created_service) + if err != nil { + t.Fatal(err) + } + if !metav1.HasObjectMetaSystemFieldValues(objMeta) { + t.Errorf("storage did not populate object meta field values") + } + if created_service.Name != "foo" { + t.Errorf("Expected foo, but got %v", created_service.Name) + } + if created_service.CreationTimestamp.IsZero() { + t.Errorf("Expected timestamp to be set, got: %v", created_service.CreationTimestamp) + } + allocNet := ipnetGet(t, tc.useSecondary) + + if !allocNet.Contains(net.ParseIP(created_service.Spec.ClusterIP)) { + t.Errorf("Unexpected ClusterIP: %s", created_service.Spec.ClusterIP) + } + srv, err := registry.GetService(ctx, tc.svc.Name, &metav1.GetOptions{}) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if srv == nil { + t.Errorf("Failed to find service: %s", tc.svc.Name) + } + }) } } func TestServiceRegistryCreateDryRun(t *testing.T) { - storage, registry, server := NewTestREST(t, nil) - defer server.Terminate(t) - - // Test dry run create request with cluster ip - svc := &api.Service{ - ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"bar": "baz"}, - SessionAffinity: api.ServiceAffinityNone, - Type: api.ServiceTypeClusterIP, - ClusterIP: "1.2.3.4", - Ports: []api.ServicePort{{ - Port: 6502, - Protocol: api.ProtocolTCP, - TargetPort: intstr.FromInt(6502), - }}, + ipv6Service := api.IPv6Protocol + testCases := []struct { + name string + svc *api.Service + enableDualStack bool + useSecondary bool + }{ + { + name: "v4 service", + enableDualStack: false, + useSecondary: false, + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + ClusterIP: "1.2.3.4", + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, + }, + { + name: "v6 service", + enableDualStack: true, + useSecondary: true, + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv6Service, + ClusterIP: "2000:0:0:0:0:0:0:1", + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, }, } - ctx := genericapirequest.NewDefaultContext() - _, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - if storage.serviceIPs.Has(net.ParseIP("1.2.3.4")) { - t.Errorf("unexpected side effect: ip allocated") - } - srv, err := registry.GetService(ctx, svc.Name, &metav1.GetOptions{}) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - if srv != nil { - t.Errorf("unexpected service found: %v", srv) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() + storage, registry, server := NewTestREST(t, nil, tc.enableDualStack) + defer server.Terminate(t) + + ctx := genericapirequest.NewDefaultContext() + _, err := storage.Create(ctx, tc.svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + alloc := allocGet(storage, tc.useSecondary) + + if alloc.Has(net.ParseIP(tc.svc.Spec.ClusterIP)) { + t.Errorf("unexpected side effect: ip allocated") + } + srv, err := registry.GetService(ctx, tc.svc.Name, &metav1.GetOptions{}) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if srv != nil { + t.Errorf("unexpected service found: %v", srv) + } + }) } +} + +func TestDryRunNodePort(t *testing.T) { + storage, registry, server := NewTestREST(t, nil, false) + defer server.Terminate(t) // Test dry run create request with a node port - svc = &api.Service{ + svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{ Selector: map[string]string{"bar": "baz"}, @@ -353,14 +509,16 @@ func TestServiceRegistryCreateDryRun(t *testing.T) { }}, }, } - _, err = storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}) + ctx := genericapirequest.NewDefaultContext() + + _, err := storage.Create(ctx, svc, rest.ValidateAllObjectFunc, &metav1.CreateOptions{DryRun: []string{metav1.DryRunAll}}) if err != nil { t.Fatalf("Unexpected error: %v", err) } if storage.serviceNodePorts.Has(30010) { t.Errorf("unexpected side effect: NodePort allocated") } - srv, err = registry.GetService(ctx, svc.Name, &metav1.GetOptions{}) + srv, err := registry.GetService(ctx, svc.Name, &metav1.GetOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -452,7 +610,8 @@ func TestServiceRegistryCreateDryRun(t *testing.T) { } func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) { - storage, registry, server := NewTestREST(t, nil) + + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) testCases := []struct { @@ -582,7 +741,7 @@ func TestServiceRegistryCreateMultiNodePortsService(t *testing.T) { } func TestServiceStorageValidatesCreate(t *testing.T) { - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) failureCases := map[string]api.Service{ "empty ID": { @@ -636,7 +795,7 @@ func TestServiceStorageValidatesCreate(t *testing.T) { func TestServiceRegistryUpdate(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) obj, err := registry.Create(ctx, &api.Service{ @@ -688,8 +847,9 @@ func TestServiceRegistryUpdate(t *testing.T) { } func TestServiceRegistryUpdateDryRun(t *testing.T) { + ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) obj, err := registry.Create(ctx, &api.Service{ @@ -854,7 +1014,7 @@ func TestServiceRegistryUpdateDryRun(t *testing.T) { func TestServiceStorageValidatesUpdate(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, @@ -907,7 +1067,7 @@ func TestServiceStorageValidatesUpdate(t *testing.T) { func TestServiceRegistryExternalService(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, @@ -946,7 +1106,7 @@ func TestServiceRegistryExternalService(t *testing.T) { func TestServiceRegistryDelete(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, @@ -969,7 +1129,7 @@ func TestServiceRegistryDelete(t *testing.T) { func TestServiceRegistryDeleteDryRun(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) // Test dry run delete request with cluster ip @@ -1035,7 +1195,7 @@ func TestServiceRegistryDeleteDryRun(t *testing.T) { func TestServiceRegistryDeleteExternal(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, @@ -1058,7 +1218,7 @@ func TestServiceRegistryDeleteExternal(t *testing.T) { func TestServiceRegistryUpdateExternalService(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) // Create non-external load balancer. @@ -1097,7 +1257,7 @@ func TestServiceRegistryUpdateExternalService(t *testing.T) { func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) // Create external load balancer. @@ -1135,7 +1295,7 @@ func TestServiceRegistryUpdateMultiPortExternalService(t *testing.T) { func TestServiceRegistryGet(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo"}, @@ -1211,7 +1371,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { }, }, } - storage, registry, server := NewTestRESTWithPods(t, endpoints, pods) + storage, registry, server := NewTestRESTWithPods(t, endpoints, pods, false) defer server.Terminate(t) for _, name := range []string{"foo", "bad"} { registry.Create(ctx, &api.Service{ @@ -1311,7 +1471,7 @@ func TestServiceRegistryResourceLocation(t *testing.T) { func TestServiceRegistryList(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) registry.Create(ctx, &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, @@ -1343,7 +1503,7 @@ func TestServiceRegistryList(t *testing.T) { } func TestServiceRegistryIPAllocation(t *testing.T) { - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc1 := &api.Service{ @@ -1426,7 +1586,7 @@ func TestServiceRegistryIPAllocation(t *testing.T) { } func TestServiceRegistryIPReallocation(t *testing.T) { - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc1 := &api.Service{ @@ -1482,7 +1642,7 @@ func TestServiceRegistryIPReallocation(t *testing.T) { } func TestServiceRegistryIPUpdate(t *testing.T) { - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ @@ -1537,7 +1697,7 @@ func TestServiceRegistryIPUpdate(t *testing.T) { } func TestServiceRegistryIPLoadBalancer(t *testing.T) { - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ @@ -1577,7 +1737,7 @@ func TestServiceRegistryIPLoadBalancer(t *testing.T) { } func TestUpdateServiceWithConflictingNamespace(t *testing.T) { - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) service := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "not-default"}, @@ -1599,7 +1759,7 @@ func TestUpdateServiceWithConflictingNamespace(t *testing.T) { // and type is LoadBalancer. func TestServiceRegistryExternalTrafficHealthCheckNodePortAllocation(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"}, @@ -1639,7 +1799,7 @@ func TestServiceRegistryExternalTrafficHealthCheckNodePortAllocation(t *testing. func TestServiceRegistryExternalTrafficHealthCheckNodePortUserAllocation(t *testing.T) { randomNodePort := generateRandomNodePort() ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"}, @@ -1682,7 +1842,7 @@ func TestServiceRegistryExternalTrafficHealthCheckNodePortUserAllocation(t *test // Validate that the service creation fails when the requested port number is -1. func TestServiceRegistryExternalTrafficHealthCheckNodePortNegative(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"}, @@ -1709,7 +1869,7 @@ func TestServiceRegistryExternalTrafficHealthCheckNodePortNegative(t *testing.T) // Validate that the health check nodePort is not allocated when ExternalTrafficPolicy is set to Global. func TestServiceRegistryExternalTrafficGlobal(t *testing.T) { ctx := genericapirequest.NewDefaultContext() - storage, registry, server := NewTestREST(t, nil) + storage, registry, server := NewTestREST(t, nil, false) defer server.Terminate(t) svc := &api.Service{ ObjectMeta: metav1.ObjectMeta{Name: "external-lb-esipp"}, @@ -1745,13 +1905,17 @@ func TestServiceRegistryExternalTrafficGlobal(t *testing.T) { } func TestInitClusterIP(t *testing.T) { - storage, _, server := NewTestREST(t, nil) - defer server.Terminate(t) - + ipv4Service := api.IPv4Protocol + ipv6Service := api.IPv6Protocol testCases := []struct { - name string - svc *api.Service - expectClusterIP bool + name string + svc *api.Service + + expectClusterIP bool + enableDualStack bool + allocateSpecificIP bool + useSecondaryAlloc bool + expectedAllocatedIP string }{ { name: "Allocate new ClusterIP", @@ -1769,6 +1933,27 @@ func TestInitClusterIP(t *testing.T) { }, }, expectClusterIP: true, + enableDualStack: false, + }, + { + name: "Allocate new ClusterIP-v6", + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv6Service, + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, + expectClusterIP: true, + useSecondaryAlloc: true, + enableDualStack: true, }, { name: "Allocate specified ClusterIP", @@ -1778,6 +1963,7 @@ func TestInitClusterIP(t *testing.T) { Selector: map[string]string{"bar": "baz"}, SessionAffinity: api.ServiceAffinityNone, Type: api.ServiceTypeClusterIP, + IPFamily: &ipv4Service, ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ Port: 6502, @@ -1786,7 +1972,33 @@ func TestInitClusterIP(t *testing.T) { }}, }, }, - expectClusterIP: true, + expectClusterIP: true, + allocateSpecificIP: true, + expectedAllocatedIP: "1.2.3.4", + enableDualStack: true, + }, + { + name: "Allocate specified ClusterIP-v6", + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + SessionAffinity: api.ServiceAffinityNone, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv6Service, + ClusterIP: "2000:0:0:0:0:0:0:1", + Ports: []api.ServicePort{{ + Port: 6502, + Protocol: api.ProtocolTCP, + TargetPort: intstr.FromInt(6502), + }}, + }, + }, + expectClusterIP: true, + allocateSpecificIP: true, + expectedAllocatedIP: "2000:0:0:0:0:0:0:1", + useSecondaryAlloc: true, + enableDualStack: true, }, { name: "Shouldn't allocate ClusterIP", @@ -1809,35 +2021,41 @@ func TestInitClusterIP(t *testing.T) { } for _, test := range testCases { - hasAllocatedIP, err := initClusterIP(test.svc, storage.serviceIPs) - if err != nil { - t.Errorf("%q: unexpected error: %v", test.name, err) - } + t.Run(test.name, func(t *testing.T) { - if hasAllocatedIP != test.expectClusterIP { - t.Errorf("%q: expected %v, but got %v", test.name, test.expectClusterIP, hasAllocatedIP) - } + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, test.enableDualStack)() - if test.expectClusterIP { - if !storage.serviceIPs.Has(net.ParseIP(test.svc.Spec.ClusterIP)) { - t.Errorf("%q: unexpected ClusterIP %q, out of range", test.name, test.svc.Spec.ClusterIP) + storage, _, server := NewTestREST(t, nil, test.enableDualStack) + defer server.Terminate(t) + + whichAlloc := allocGet(storage, test.useSecondaryAlloc) + hasAllocatedIP, err := initClusterIP(test.svc, whichAlloc) + if err != nil { + t.Errorf("unexpected error: %v", err) } - } - if test.name == "Allocate specified ClusterIP" && test.svc.Spec.ClusterIP != "1.2.3.4" { - t.Errorf("%q: expected ClusterIP %q, but got %q", test.name, "1.2.3.4", test.svc.Spec.ClusterIP) - } - - if hasAllocatedIP { - if helper.IsServiceIPSet(test.svc) { - storage.serviceIPs.Release(net.ParseIP(test.svc.Spec.ClusterIP)) + if hasAllocatedIP != test.expectClusterIP { + t.Errorf("expected %v, but got %v", test.expectClusterIP, hasAllocatedIP) } - } + + if test.expectClusterIP { + alloc := allocGet(storage, test.useSecondaryAlloc) + if !alloc.Has(net.ParseIP(test.svc.Spec.ClusterIP)) { + t.Errorf("unexpected ClusterIP %q, out of range", test.svc.Spec.ClusterIP) + } + } + + if test.allocateSpecificIP && test.expectedAllocatedIP != test.svc.Spec.ClusterIP { + t.Errorf(" expected ClusterIP %q, but got %q", test.expectedAllocatedIP, test.svc.Spec.ClusterIP) + } + + }) } + } func TestInitNodePorts(t *testing.T) { - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false) defer nodePortOp.Finish() @@ -2019,7 +2237,7 @@ func TestInitNodePorts(t *testing.T) { } func TestUpdateNodePorts(t *testing.T) { - storage, _, server := NewTestREST(t, nil) + storage, _, server := NewTestREST(t, nil, false) defer server.Terminate(t) nodePortOp := portallocator.StartOperation(storage.serviceNodePorts, false) defer nodePortOp.Finish() @@ -2287,3 +2505,142 @@ func TestUpdateNodePorts(t *testing.T) { } } } + +func TestAllocGetters(t *testing.T) { + ipv4Service := api.IPv4Protocol + ipv6Service := api.IPv6Protocol + + testCases := []struct { + name string + + enableDualStack bool + specExpctPrimary bool + clusterIPExpectPrimary bool + + svc *api.Service + }{ + { + name: "spec:v4 ip:v4 dualstack:off", + + specExpctPrimary: true, + clusterIPExpectPrimary: true, + enableDualStack: false, + + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv4Service, + ClusterIP: "10.0.0.1", + }, + }, + }, + { + name: "spec:v4 ip:v4 dualstack:on", + + specExpctPrimary: true, + clusterIPExpectPrimary: true, + enableDualStack: true, + + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv4Service, + ClusterIP: "10.0.0.1", + }, + }, + }, + + { + name: "spec:v4 ip:v6 dualstack:on", + + specExpctPrimary: true, + clusterIPExpectPrimary: false, + enableDualStack: true, + + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv4Service, + ClusterIP: "2000::1", + }, + }, + }, + + { + name: "spec:v6 ip:v6 dualstack:on", + + specExpctPrimary: false, + clusterIPExpectPrimary: false, + enableDualStack: true, + + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv6Service, + ClusterIP: "2000::1", + }, + }, + }, + + { + name: "spec:v6 ip:v4 dualstack:on", + + specExpctPrimary: false, + clusterIPExpectPrimary: true, + enableDualStack: true, + + svc: &api.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"bar": "baz"}, + Type: api.ServiceTypeClusterIP, + IPFamily: &ipv6Service, + ClusterIP: "10.0.0.10", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)() + storage, _, server := NewTestREST(t, nil, tc.enableDualStack) + defer server.Terminate(t) + + if tc.enableDualStack && storage.secondaryServiceIPs == nil { + t.Errorf("storage must allocate secondary ServiceIPs allocator for dual stack") + return + } + + alloc := storage.getAllocatorByClusterIP(tc.svc) + if tc.clusterIPExpectPrimary && !bytes.Equal(alloc.CIDR().IP, storage.serviceIPs.CIDR().IP) { + t.Errorf("expected primary allocator, but primary allocator was not selected") + return + } + + if tc.enableDualStack && !tc.clusterIPExpectPrimary && !bytes.Equal(alloc.CIDR().IP, storage.secondaryServiceIPs.CIDR().IP) { + t.Errorf("expected secondary allocator, but secondary allocator was not selected") + } + + alloc = storage.getAllocatorBySpec(tc.svc) + if tc.specExpctPrimary && !bytes.Equal(alloc.CIDR().IP, storage.serviceIPs.CIDR().IP) { + t.Errorf("expected primary allocator, but primary allocator was not selected") + return + } + + if tc.enableDualStack && !tc.specExpctPrimary && !bytes.Equal(alloc.CIDR().IP, storage.secondaryServiceIPs.CIDR().IP) { + t.Errorf("expected secondary allocator, but secondary allocator was not selected") + } + + }) + } + +} diff --git a/test/e2e_node/services/apiserver.go b/test/e2e_node/services/apiserver.go index a86fe3e6daf..636b3d71f3e 100644 --- a/test/e2e_node/services/apiserver.go +++ b/test/e2e_node/services/apiserver.go @@ -53,7 +53,7 @@ func (a *APIServer) Start() error { if err != nil { return err } - o.ServiceClusterIPRange = *ipnet + o.ServiceClusterIPRanges = ipnet.String() o.AllowPrivileged = true o.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount", "TaintNodesByCondition"} errCh := make(chan error) diff --git a/test/integration/etcd/server.go b/test/integration/etcd/server.go index 2d8441b84c7..1ab178ae0c7 100644 --- a/test/integration/etcd/server.go +++ b/test/integration/etcd/server.go @@ -76,7 +76,7 @@ func StartRealMasterOrDie(t *testing.T, configFuncs ...func(*options.ServerRunOp kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()} kubeAPIServerOptions.Etcd.DefaultStorageMediaType = runtime.ContentTypeJSON // force json we can easily interpret the result in etcd - kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange + kubeAPIServerOptions.ServiceClusterIPRanges = defaultServiceClusterIPRange.String() kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"} kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"} kubeAPIServerOptions.APIEnablement.RuntimeConfig["api/all"] = "true" diff --git a/test/integration/examples/apiserver_test.go b/test/integration/examples/apiserver_test.go index f3ce8862ed1..b93387f5616 100644 --- a/test/integration/examples/apiserver_test.go +++ b/test/integration/examples/apiserver_test.go @@ -101,7 +101,7 @@ func TestAggregatedAPIServer(t *testing.T) { kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{framework.GetEtcdURL()} - kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange + kubeAPIServerOptions.ServiceClusterIPRanges = defaultServiceClusterIPRange.String() kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"} diff --git a/test/integration/framework/test_server.go b/test/integration/framework/test_server.go index 32ba52f4093..a92e1d2f49c 100644 --- a/test/integration/framework/test_server.go +++ b/test/integration/framework/test_server.go @@ -92,7 +92,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup kubeAPIServerOptions.InsecureServing.BindPort = 0 kubeAPIServerOptions.Etcd.StorageConfig.Prefix = path.Join("/", uuid.New(), "registry") kubeAPIServerOptions.Etcd.StorageConfig.Transport.ServerList = []string{GetEtcdURL()} - kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange + kubeAPIServerOptions.ServiceClusterIPRanges = defaultServiceClusterIPRange.String() kubeAPIServerOptions.Authentication.RequestHeader.UsernameHeaders = []string{"X-Remote-User"} kubeAPIServerOptions.Authentication.RequestHeader.GroupHeaders = []string{"X-Remote-Group"} kubeAPIServerOptions.Authentication.RequestHeader.ExtraHeaderPrefixes = []string{"X-Remote-Extra-"}