mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #99870 from swetharepakula/eps-ga-controller
Graduate EndpointSlice Controllers to GA
This commit is contained in:
commit
d6a9061cb4
@ -55,12 +55,10 @@ func validateClusterIPFlags(options *ServerRunOptions) []error {
|
||||
}
|
||||
|
||||
// Secondary IP validation
|
||||
// while api-server dualstack bits does not have dependency on EndPointSlice, its
|
||||
// a good idea to have validation consistent across all components (ControllerManager
|
||||
// needs EndPointSlice + DualStack feature flags).
|
||||
// ControllerManager needs DualStack feature flags
|
||||
secondaryServiceClusterIPRangeUsed := (options.SecondaryServiceClusterIPRange.IP != nil)
|
||||
if secondaryServiceClusterIPRangeUsed && (!utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) || !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)) {
|
||||
errs = append(errs, fmt.Errorf("secondary service cluster-ip range(--service-cluster-ip-range[1]) can only be used if %v and %v feature is enabled", string(features.IPv6DualStack), string(features.EndpointSlice)))
|
||||
if secondaryServiceClusterIPRangeUsed && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
|
||||
errs = append(errs, fmt.Errorf("secondary service cluster-ip range(--service-cluster-ip-range[1]) can only be used if %v feature is enabled", string(features.IPv6DualStack)))
|
||||
}
|
||||
|
||||
// note: While the cluster might be dualstack (i.e. pods with multiple IPs), the user may choose
|
||||
|
@ -52,13 +52,12 @@ func makeOptionsWithCIDRs(serviceCIDR string, secondaryServiceCIDR string) *Serv
|
||||
}
|
||||
}
|
||||
|
||||
func TestClusterSerivceIPRange(t *testing.T) {
|
||||
func TestClusterServiceIPRange(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
options *ServerRunOptions
|
||||
enableDualStack bool
|
||||
enableEndpointSlice bool
|
||||
expectErrors bool
|
||||
name string
|
||||
options *ServerRunOptions
|
||||
enableDualStack bool
|
||||
expectErrors bool
|
||||
}{
|
||||
{
|
||||
name: "no service cidr",
|
||||
@ -67,11 +66,10 @@ func TestClusterSerivceIPRange(t *testing.T) {
|
||||
enableDualStack: false,
|
||||
},
|
||||
{
|
||||
name: "only secondary service cidr, dual stack gate on",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("", "10.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "only secondary service cidr, dual stack gate on",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("", "10.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
},
|
||||
{
|
||||
name: "only secondary service cidr, dual stack gate off",
|
||||
@ -80,18 +78,16 @@ func TestClusterSerivceIPRange(t *testing.T) {
|
||||
enableDualStack: false,
|
||||
},
|
||||
{
|
||||
name: "primary and secondary are provided but not dual stack v4-v4",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "11.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "primary and secondary are provided but not dual stack v4-v4",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "11.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
},
|
||||
{
|
||||
name: "primary and secondary are provided but not dual stack v6-v6",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("2000::/108", "3000::/108"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "primary and secondary are provided but not dual stack v6-v6",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("2000::/108", "3000::/108"),
|
||||
enableDualStack: true,
|
||||
},
|
||||
{
|
||||
name: "valid dual stack with gate disabled",
|
||||
@ -100,34 +96,24 @@ func TestClusterSerivceIPRange(t *testing.T) {
|
||||
enableDualStack: false,
|
||||
},
|
||||
{
|
||||
name: "service cidr is too big",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/8", ""),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "service cidr is too big",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/8", ""),
|
||||
enableDualStack: true,
|
||||
},
|
||||
{
|
||||
name: "dual-stack secondary cidr too big",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/64"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "dual-stack secondary cidr too big",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/64"),
|
||||
enableDualStack: true,
|
||||
},
|
||||
{
|
||||
name: "valid v6-v4 dual stack + gate on + endpointSlice gate is on",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("3000::/108", "10.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "valid v6-v4 dual stack + gate on + endpointSlice gate is on",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("3000::/108", "10.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
},
|
||||
|
||||
{
|
||||
name: "valid v4-v6 dual stack + gate on + endpointSlice is off",
|
||||
expectErrors: true,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/108"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: false,
|
||||
},
|
||||
/* success cases */
|
||||
{
|
||||
name: "valid primary",
|
||||
@ -136,25 +122,22 @@ func TestClusterSerivceIPRange(t *testing.T) {
|
||||
enableDualStack: false,
|
||||
},
|
||||
{
|
||||
name: "valid v4-v6 dual stack + gate on",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/108"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "valid v4-v6 dual stack + gate on",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("10.0.0.0/16", "3000::/108"),
|
||||
enableDualStack: true,
|
||||
},
|
||||
{
|
||||
name: "valid v6-v4 dual stack + gate on",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("3000::/108", "10.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
enableEndpointSlice: true,
|
||||
name: "valid v6-v4 dual stack + gate on",
|
||||
expectErrors: false,
|
||||
options: makeOptionsWithCIDRs("3000::/108", "10.0.0.0/16"),
|
||||
enableDualStack: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.IPv6DualStack, tc.enableDualStack)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSlice, tc.enableEndpointSlice)()
|
||||
errs := validateClusterIPFlags(tc.options)
|
||||
if len(errs) > 0 && !tc.expectErrors {
|
||||
t.Errorf("expected no errors, errors found %+v", errs)
|
||||
|
@ -110,8 +110,8 @@ func startNodeIpamController(ctx ControllerContext) (http.Handler, bool, error)
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
// failure: more than one cidr and dual stack is not enabled and/or endpoint slice is not enabled
|
||||
if len(clusterCIDRs) > 1 && (!utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) || !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice)) {
|
||||
// failure: more than one cidr and dual stack is not enabled
|
||||
if len(clusterCIDRs) > 1 && !utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
|
||||
return nil, false, fmt.Errorf("len of ClusterCIDRs==%v and dualstack or EndpointSlice feature is not enabled", len(clusterCIDRs))
|
||||
}
|
||||
|
||||
|
@ -23,30 +23,16 @@ package app
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
discoveryv1beta1 "k8s.io/api/discovery/v1beta1"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog/v2"
|
||||
endpointslicecontroller "k8s.io/kubernetes/pkg/controller/endpointslice"
|
||||
endpointslicemirroringcontroller "k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
)
|
||||
|
||||
func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
klog.V(2).Infof("Not starting endpointslice-controller since EndpointSlice feature gate is disabled")
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
if !ctx.AvailableResources[discoveryv1beta1.SchemeGroupVersion.WithResource("endpointslices")] {
|
||||
klog.Warningf("Not starting endpointslice-controller since discovery.k8s.io/v1beta1 resources are not available")
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
go endpointslicecontroller.NewController(
|
||||
ctx.InformerFactory.Core().V1().Pods(),
|
||||
ctx.InformerFactory.Core().V1().Services(),
|
||||
ctx.InformerFactory.Core().V1().Nodes(),
|
||||
ctx.InformerFactory.Discovery().V1beta1().EndpointSlices(),
|
||||
ctx.InformerFactory.Discovery().V1().EndpointSlices(),
|
||||
ctx.ComponentConfig.EndpointSliceController.MaxEndpointsPerSlice,
|
||||
ctx.ClientBuilder.ClientOrDie("endpointslice-controller"),
|
||||
ctx.ComponentConfig.EndpointSliceController.EndpointUpdatesBatchPeriod.Duration,
|
||||
@ -55,19 +41,9 @@ func startEndpointSliceController(ctx ControllerContext) (http.Handler, bool, er
|
||||
}
|
||||
|
||||
func startEndpointSliceMirroringController(ctx ControllerContext) (http.Handler, bool, error) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
klog.V(2).Infof("Not starting endpointslicemirroring-controller since EndpointSlice feature gate is disabled")
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
if !ctx.AvailableResources[discoveryv1beta1.SchemeGroupVersion.WithResource("endpointslices")] {
|
||||
klog.Warningf("Not starting endpointslicemirroring-controller since discovery.k8s.io/v1beta1 resources are not available")
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
go endpointslicemirroringcontroller.NewController(
|
||||
ctx.InformerFactory.Core().V1().Endpoints(),
|
||||
ctx.InformerFactory.Discovery().V1beta1().EndpointSlices(),
|
||||
ctx.InformerFactory.Discovery().V1().EndpointSlices(),
|
||||
ctx.InformerFactory.Core().V1().Services(),
|
||||
ctx.ComponentConfig.EndpointSliceMirroringController.MirroringMaxEndpointsPerSubset,
|
||||
ctx.ClientBuilder.ClientOrDie("endpointslicemirroring-controller"),
|
||||
|
@ -19,7 +19,7 @@ package endpointslice
|
||||
import (
|
||||
"sort"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
)
|
||||
|
||||
|
@ -23,18 +23,18 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1"
|
||||
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
discoverylisters "k8s.io/client-go/listers/discovery/v1beta1"
|
||||
discoverylisters "k8s.io/client-go/listers/discovery/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@ -87,7 +87,7 @@ func NewController(podInformer coreinformers.PodInformer,
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})
|
||||
|
||||
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_controller", client.DiscoveryV1beta1().RESTClient().GetRateLimiter())
|
||||
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_controller", client.DiscoveryV1().RESTClient().GetRateLimiter())
|
||||
}
|
||||
|
||||
endpointslicemetrics.RegisterMetrics()
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -71,7 +71,7 @@ func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clients
|
||||
informerFactory.Core().V1().Pods(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
nodeInformer,
|
||||
informerFactory.Discovery().V1beta1().EndpointSlices(),
|
||||
informerFactory.Discovery().V1().EndpointSlices(),
|
||||
int32(100),
|
||||
client,
|
||||
batchPeriod)
|
||||
@ -83,7 +83,7 @@ func newController(nodeNames []string, batchPeriod time.Duration) (*fake.Clients
|
||||
|
||||
return client, &endpointSliceController{
|
||||
esController,
|
||||
informerFactory.Discovery().V1beta1().EndpointSlices().Informer().GetStore(),
|
||||
informerFactory.Discovery().V1().EndpointSlices().Informer().GetStore(),
|
||||
informerFactory.Core().V1().Nodes().Informer().GetStore(),
|
||||
informerFactory.Core().V1().Pods().Informer().GetStore(),
|
||||
informerFactory.Core().V1().Services().Informer().GetStore(),
|
||||
@ -134,7 +134,7 @@ func TestSyncServiceWithSelector(t *testing.T) {
|
||||
standardSyncService(t, esController, ns, serviceName)
|
||||
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
||||
|
||||
sliceList, err := client.DiscoveryV1beta1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
sliceList, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
assert.Nil(t, err, "Expected no error fetching endpoint slices")
|
||||
assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")
|
||||
slice := sliceList.Items[0]
|
||||
@ -201,7 +201,7 @@ func TestSyncServicePodSelection(t *testing.T) {
|
||||
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
||||
|
||||
// an endpoint slice should be created, it should only reference pod1 (not pod2)
|
||||
slices, err := client.DiscoveryV1beta1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
slices, err := client.DiscoveryV1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
assert.Nil(t, err, "Expected no error fetching endpoint slices")
|
||||
assert.Len(t, slices.Items, 1, "Expected 1 endpoint slices")
|
||||
slice := slices.Items[0]
|
||||
@ -283,7 +283,7 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error adding EndpointSlice: %v", err)
|
||||
}
|
||||
_, err = client.DiscoveryV1beta1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
_, err = client.DiscoveryV1().EndpointSlices(ns).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error creating EndpointSlice: %v", err)
|
||||
}
|
||||
@ -451,7 +451,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
{
|
||||
@ -460,7 +459,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
},
|
||||
@ -566,7 +564,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"fd08::5678:0000:0000:9abc:def0"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
},
|
||||
@ -672,7 +669,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
{
|
||||
@ -683,7 +679,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
},
|
||||
@ -788,7 +783,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
},
|
||||
@ -895,7 +889,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
{
|
||||
@ -906,7 +899,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod1"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
},
|
||||
@ -1011,7 +1003,6 @@ func TestSyncService(t *testing.T) {
|
||||
},
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
},
|
||||
},
|
||||
@ -1038,14 +1029,14 @@ func TestSyncService(t *testing.T) {
|
||||
|
||||
// last action should be to create endpoint slice
|
||||
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
||||
sliceList, err := client.DiscoveryV1beta1().EndpointSlices(testcase.service.Namespace).List(context.TODO(), metav1.ListOptions{})
|
||||
sliceList, err := client.DiscoveryV1().EndpointSlices(testcase.service.Namespace).List(context.TODO(), metav1.ListOptions{})
|
||||
assert.Nil(t, err, "Expected no error fetching endpoint slices")
|
||||
assert.Len(t, sliceList.Items, 1, "Expected 1 endpoint slices")
|
||||
|
||||
// ensure all attributes of endpoint slice match expected state
|
||||
slice := sliceList.Items[0]
|
||||
assert.Equal(t, slice.Annotations["endpoints.kubernetes.io/last-change-trigger-time"], creationTimestamp.Format(time.RFC3339Nano))
|
||||
assert.EqualValues(t, testcase.expectedEndpointPorts, slice.Ports)
|
||||
assert.ElementsMatch(t, testcase.expectedEndpointPorts, slice.Ports)
|
||||
assert.ElementsMatch(t, testcase.expectedEndpoints, slice.Endpoints)
|
||||
})
|
||||
}
|
||||
|
@ -19,8 +19,8 @@ package endpointslice
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
|
@ -19,8 +19,8 @@ package endpointslice
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
@ -19,7 +19,7 @@ package metrics
|
||||
import (
|
||||
"testing"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
)
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -97,7 +97,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
// delete those which are of addressType that is no longer supported
|
||||
// by the service
|
||||
for _, sliceToDelete := range slicesToDelete {
|
||||
err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
|
||||
err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), sliceToDelete.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceToDelete.Name, service.Namespace, service.Name, err))
|
||||
} else {
|
||||
@ -265,7 +265,7 @@ func (r *reconciler) finalize(
|
||||
if service.DeletionTimestamp == nil {
|
||||
for _, endpointSlice := range slicesToCreate {
|
||||
addTriggerTimeAnnotation(endpointSlice, triggerTime)
|
||||
createdSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
createdSlice, err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
// If the namespace is terminating, creates will continue to fail. Simply drop the item.
|
||||
if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
|
||||
@ -280,7 +280,7 @@ func (r *reconciler) finalize(
|
||||
|
||||
for _, endpointSlice := range slicesToUpdate {
|
||||
addTriggerTimeAnnotation(endpointSlice, triggerTime)
|
||||
updatedSlice, err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
|
||||
updatedSlice, err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)
|
||||
}
|
||||
@ -289,7 +289,7 @@ func (r *reconciler) finalize(
|
||||
}
|
||||
|
||||
for _, endpointSlice := range slicesToDelete {
|
||||
err := r.client.DiscoveryV1beta1().EndpointSlices(service.Namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
|
||||
err := r.client.DiscoveryV1().EndpointSlices(service.Namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)
|
||||
}
|
||||
@ -336,7 +336,7 @@ func (r *reconciler) reconcileByPortMapping(
|
||||
newEndpoints = append(newEndpoints, *got)
|
||||
// If existing version of endpoint doesn't match desired version
|
||||
// set endpointUpdated to ensure endpoint changes are persisted.
|
||||
if !endpointsEqualBeyondHash(got, &endpoint) {
|
||||
if !endpointutil.EndpointsEqualBeyondHash(got, &endpoint) {
|
||||
endpointUpdated = true
|
||||
}
|
||||
// once an endpoint has been placed/found in a slice, it no
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -126,12 +126,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -152,12 +148,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -183,11 +175,7 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -211,12 +199,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -229,12 +213,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -253,12 +233,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -271,12 +247,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -297,12 +269,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -315,12 +283,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -341,12 +305,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -369,12 +329,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -396,12 +352,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1234::5678:0000:0000:9abc:def0"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -413,12 +365,8 @@ func TestReconcile1Pod(t *testing.T) {
|
||||
{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: namespace,
|
||||
@ -512,7 +460,7 @@ func TestReconcile1EndpointSlice(t *testing.T) {
|
||||
svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
|
||||
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
|
||||
_, createErr := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice1, metav1.CreateOptions{})
|
||||
_, createErr := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice1, metav1.CreateOptions{})
|
||||
assert.Nil(t, createErr, "Expected no error creating endpoint slice")
|
||||
|
||||
numActionsBefore := len(client.Actions())
|
||||
@ -1281,7 +1229,7 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
|
||||
}
|
||||
|
||||
// Add EndpointSlice that can be updated.
|
||||
esToUpdate, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), &discovery.EndpointSlice{
|
||||
esToUpdate, err := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "to-update",
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
@ -1297,7 +1245,7 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
|
||||
esToUpdate.Endpoints = []discovery.Endpoint{{Addresses: []string{"10.2.3.4"}}}
|
||||
|
||||
// Add EndpointSlice that can be deleted.
|
||||
esToDelete, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), &discovery.EndpointSlice{
|
||||
esToDelete, err := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "to-delete",
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
@ -1470,7 +1418,7 @@ func portsAndAddressTypeEqual(slice1, slice2 discovery.EndpointSlice) bool {
|
||||
func createEndpointSlices(t *testing.T, client *fake.Clientset, namespace string, endpointSlices []*discovery.EndpointSlice) {
|
||||
t.Helper()
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
_, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
_, err := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error creating Endpoint Slice, got: %v", err)
|
||||
}
|
||||
@ -1479,7 +1427,7 @@ func createEndpointSlices(t *testing.T, client *fake.Clientset, namespace string
|
||||
|
||||
func fetchEndpointSlices(t *testing.T, client *fake.Clientset, namespace string) []discovery.EndpointSlice {
|
||||
t.Helper()
|
||||
fetchedSlices, err := client.DiscoveryV1beta1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{})
|
||||
fetchedSlices, err := client.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error fetching Endpoint Slices, got: %v", err)
|
||||
return []discovery.EndpointSlice{}
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -41,23 +41,6 @@ import (
|
||||
|
||||
// podToEndpoint returns an Endpoint object generated from a Pod, a Node, and a Service for a particular addressType.
|
||||
func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service, addressType discovery.AddressType) discovery.Endpoint {
|
||||
// Build out topology information. This is currently limited to hostname,
|
||||
// zone, and region, but this will be expanded in the future.
|
||||
topology := map[string]string{}
|
||||
|
||||
if node != nil {
|
||||
topologyLabels := []string{
|
||||
"topology.kubernetes.io/zone",
|
||||
"topology.kubernetes.io/region",
|
||||
}
|
||||
|
||||
for _, topologyLabel := range topologyLabels {
|
||||
if node.Labels[topologyLabel] != "" {
|
||||
topology[topologyLabel] = node.Labels[topologyLabel]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
serving := podutil.IsPodReady(pod)
|
||||
terminating := pod.DeletionTimestamp != nil
|
||||
// For compatibility reasons, "ready" should never be "true" if a pod is terminatng, unless
|
||||
@ -83,12 +66,14 @@ func podToEndpoint(pod *corev1.Pod, node *corev1.Node, service *corev1.Service,
|
||||
}
|
||||
|
||||
if pod.Spec.NodeName != "" {
|
||||
topology["kubernetes.io/hostname"] = pod.Spec.NodeName
|
||||
ep.Topology = topology
|
||||
|
||||
ep.NodeName = &pod.Spec.NodeName
|
||||
}
|
||||
|
||||
if node != nil && node.Labels[corev1.LabelTopologyZone] != "" {
|
||||
zone := node.Labels[corev1.LabelTopologyZone]
|
||||
ep.Zone = &zone
|
||||
}
|
||||
|
||||
if endpointutil.ShouldSetHostname(pod, service) {
|
||||
ep.Hostname = &pod.Spec.Hostname
|
||||
}
|
||||
@ -147,25 +132,6 @@ func getEndpointAddresses(podStatus corev1.PodStatus, service *corev1.Service, a
|
||||
return addresses
|
||||
}
|
||||
|
||||
// endpointsEqualBeyondHash returns true if endpoints have equal attributes
|
||||
// but excludes equality checks that would have already been covered with
|
||||
// endpoint hashing (see hashEndpoint func for more info).
|
||||
func endpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
|
||||
if !apiequality.Semantic.DeepEqual(ep1.Topology, ep2.Topology) {
|
||||
return false
|
||||
}
|
||||
|
||||
if boolPtrChanged(ep1.Conditions.Ready, ep2.Conditions.Ready) {
|
||||
return false
|
||||
}
|
||||
|
||||
if objectRefPtrChanged(ep1.TargetRef, ep2.TargetRef) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// newEndpointSlice returns an EndpointSlice generated from a service and
|
||||
// endpointMeta.
|
||||
func newEndpointSlice(service *corev1.Service, endpointMeta *endpointMeta) *discovery.EndpointSlice {
|
||||
@ -198,29 +164,6 @@ func getEndpointSlicePrefix(serviceName string) string {
|
||||
return prefix
|
||||
}
|
||||
|
||||
// boolPtrChanged returns true if a set of bool pointers have different values.
|
||||
func boolPtrChanged(ptr1, ptr2 *bool) bool {
|
||||
if (ptr1 == nil) != (ptr2 == nil) {
|
||||
return true
|
||||
}
|
||||
if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// objectRefPtrChanged returns true if a set of object ref pointers have
|
||||
// different values.
|
||||
func objectRefPtrChanged(ref1, ref2 *corev1.ObjectReference) bool {
|
||||
if (ref1 == nil) != (ref2 == nil) {
|
||||
return true
|
||||
}
|
||||
if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ownedBy returns true if the provided EndpointSlice is owned by the provided
|
||||
// Service.
|
||||
func ownedBy(endpointSlice *discovery.EndpointSlice, svc *corev1.Service) bool {
|
||||
|
@ -24,7 +24,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -260,7 +260,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -278,7 +277,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -296,7 +294,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(false)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -314,7 +311,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -333,12 +329,8 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
@ -356,12 +348,8 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
expectedEndpoint: discovery.Endpoint{
|
||||
Addresses: []string{"1.2.3.4"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
@ -380,12 +368,8 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
Addresses: []string{"1.2.3.5"},
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
Hostname: &readyPodHostname.Spec.Hostname,
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-1",
|
||||
"topology.kubernetes.io/zone": "us-central1-a",
|
||||
"topology.kubernetes.io/region": "us-central1",
|
||||
},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Zone: utilpointer.StringPtr("us-central1-a"),
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Namespace: ns,
|
||||
@ -406,7 +390,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -427,7 +410,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -450,7 +432,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
Serving: utilpointer.BoolPtr(true),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -471,7 +452,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(false),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
@ -494,7 +474,6 @@ func TestPodToEndpoint(t *testing.T) {
|
||||
Serving: utilpointer.BoolPtr(false),
|
||||
Terminating: utilpointer.BoolPtr(true),
|
||||
},
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
TargetRef: &v1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
|
@ -19,7 +19,7 @@ package endpointslicemirroring
|
||||
import (
|
||||
"sort"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
)
|
||||
|
||||
|
@ -19,8 +19,8 @@ package endpointslicemirroring
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
|
@ -19,8 +19,8 @@ package endpointslicemirroring
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
@ -23,18 +23,18 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
discoveryinformers "k8s.io/client-go/informers/discovery/v1beta1"
|
||||
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
discoverylisters "k8s.io/client-go/listers/discovery/v1beta1"
|
||||
discoverylisters "k8s.io/client-go/listers/discovery/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
@ -79,7 +79,7 @@ func NewController(endpointsInformer coreinformers.EndpointsInformer,
|
||||
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"})
|
||||
|
||||
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
|
||||
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_mirroring_controller", client.DiscoveryV1beta1().RESTClient().GetRateLimiter())
|
||||
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_mirroring_controller", client.DiscoveryV1().RESTClient().GetRateLimiter())
|
||||
}
|
||||
|
||||
metrics.RegisterMetrics()
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
@ -52,7 +52,7 @@ func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMi
|
||||
|
||||
esController := NewController(
|
||||
informerFactory.Core().V1().Endpoints(),
|
||||
informerFactory.Discovery().V1beta1().EndpointSlices(),
|
||||
informerFactory.Discovery().V1().EndpointSlices(),
|
||||
informerFactory.Core().V1().Services(),
|
||||
int32(1000),
|
||||
client,
|
||||
@ -65,7 +65,7 @@ func newController(batchPeriod time.Duration) (*fake.Clientset, *endpointSliceMi
|
||||
return client, &endpointSliceMirroringController{
|
||||
esController,
|
||||
informerFactory.Core().V1().Endpoints().Informer().GetStore(),
|
||||
informerFactory.Discovery().V1beta1().EndpointSlices().Informer().GetStore(),
|
||||
informerFactory.Discovery().V1().EndpointSlices().Informer().GetStore(),
|
||||
informerFactory.Core().V1().Services().Informer().GetStore(),
|
||||
}
|
||||
}
|
||||
@ -229,7 +229,7 @@ func TestSyncEndpoints(t *testing.T) {
|
||||
for _, epSlice := range tc.endpointSlices {
|
||||
epSlice.Namespace = namespace
|
||||
esController.endpointSliceStore.Add(epSlice)
|
||||
_, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), epSlice, metav1.CreateOptions{})
|
||||
_, err := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), epSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error creating EndpointSlice, got %v", err)
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ package metrics
|
||||
import (
|
||||
"testing"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
)
|
||||
|
@ -21,7 +21,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
@ -230,7 +230,7 @@ func (r *reconciler) finalize(endpoints *corev1.Endpoints, slices slicesByAction
|
||||
// be deleted.
|
||||
recycleSlices(&slices)
|
||||
|
||||
epsClient := r.client.DiscoveryV1beta1().EndpointSlices(endpoints.Namespace)
|
||||
epsClient := r.client.DiscoveryV1().EndpointSlices(endpoints.Namespace)
|
||||
|
||||
// Don't create more EndpointSlices if corresponding Endpoints resource is
|
||||
// being deleted.
|
||||
@ -276,7 +276,7 @@ func (r *reconciler) deleteEndpoints(namespace, name string, endpointSlices []*d
|
||||
r.metricsCache.DeleteEndpoints(types.NamespacedName{Namespace: namespace, Name: name})
|
||||
var errs []error
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
err := r.client.DiscoveryV1beta1().EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
|
||||
err := r.client.DiscoveryV1().EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
@ -314,7 +314,7 @@ func totalChanges(existingSlice *discovery.EndpointSlice, desiredSet endpointSet
|
||||
|
||||
// If existing version of endpoint doesn't match desired version
|
||||
// increment number of endpoints to be updated.
|
||||
if !endpointsEqualBeyondHash(got, &endpoint) {
|
||||
if !endpointutil.EndpointsEqualBeyondHash(got, &endpoint) {
|
||||
totals.updated++
|
||||
}
|
||||
}
|
||||
|
@ -17,8 +17,8 @@ limitations under the License.
|
||||
package endpointslicemirroring
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
)
|
||||
|
||||
// slicesByAction includes lists of slices to create, update, or delete.
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
"testing"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
@ -375,12 +375,12 @@ func TestReconcile(t *testing.T) {
|
||||
Endpoints: []discovery.Endpoint{{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Hostname: utilpointer.StringPtr("pod-1"),
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-1"},
|
||||
NodeName: utilpointer.StringPtr("node-1"),
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
}, {
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Hostname: utilpointer.StringPtr("pod-2"),
|
||||
Topology: map[string]string{"kubernetes.io/hostname": "node-2"},
|
||||
NodeName: utilpointer.StringPtr("node-2"),
|
||||
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
|
||||
}},
|
||||
}},
|
||||
@ -656,7 +656,7 @@ func TestReconcile(t *testing.T) {
|
||||
discovery.LabelServiceName: endpoints.Name,
|
||||
discovery.LabelManagedBy: controllerName,
|
||||
}
|
||||
_, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), epSlice, metav1.CreateOptions{})
|
||||
_, err := client.DiscoveryV1().EndpointSlices(namespace).Create(context.TODO(), epSlice, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error creating EndpointSlice, got %v", err)
|
||||
}
|
||||
@ -846,11 +846,11 @@ func expectMatchingAddresses(t *testing.T, epSubset corev1.EndpointSubset, esEnd
|
||||
}
|
||||
|
||||
if expectedEndpoint.epAddress.NodeName != nil {
|
||||
topologyHostname, ok := endpoint.Topology["kubernetes.io/hostname"]
|
||||
if !ok {
|
||||
t.Errorf("Expected topology[kubernetes.io/hostname] to be set")
|
||||
} else if *expectedEndpoint.epAddress.NodeName != topologyHostname {
|
||||
t.Errorf("Expected topology[kubernetes.io/hostname] to be %s, got %s", *expectedEndpoint.epAddress.NodeName, topologyHostname)
|
||||
if endpoint.NodeName == nil {
|
||||
t.Errorf("Expected nodeName to be set")
|
||||
}
|
||||
if *expectedEndpoint.epAddress.NodeName != *endpoint.NodeName {
|
||||
t.Errorf("Expected nodeName to be %s, got %s", *expectedEndpoint.epAddress.NodeName, *endpoint.NodeName)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -858,7 +858,7 @@ func expectMatchingAddresses(t *testing.T, epSubset corev1.EndpointSubset, esEnd
|
||||
|
||||
func fetchEndpointSlices(t *testing.T, client *fake.Clientset, namespace string) []discovery.EndpointSlice {
|
||||
t.Helper()
|
||||
fetchedSlices, err := client.DiscoveryV1beta1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{
|
||||
fetchedSlices, err := client.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), metav1.ListOptions{
|
||||
LabelSelector: discovery.LabelManagedBy + "=" + controllerName,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -22,8 +22,7 @@ import (
|
||||
"strings"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
@ -62,25 +61,6 @@ func getAddressType(address string) *discovery.AddressType {
|
||||
return &addressType
|
||||
}
|
||||
|
||||
// endpointsEqualBeyondHash returns true if endpoints have equal attributes
|
||||
// but excludes equality checks that would have already been covered with
|
||||
// endpoint hashing (see hashEndpoint func for more info).
|
||||
func endpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
|
||||
if !apiequality.Semantic.DeepEqual(ep1.Topology, ep2.Topology) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !boolPtrEqual(ep1.Conditions.Ready, ep2.Conditions.Ready) {
|
||||
return false
|
||||
}
|
||||
|
||||
if !objectRefPtrEqual(ep1.TargetRef, ep2.TargetRef) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// newEndpointSlice returns an EndpointSlice generated from an Endpoints
|
||||
// resource, ports, and address type.
|
||||
func newEndpointSlice(endpoints *corev1.Endpoints, ports []discovery.EndpointPort, addrType discovery.AddressType, sliceName string) *discovery.EndpointSlice {
|
||||
@ -135,9 +115,6 @@ func addressToEndpoint(address corev1.EndpointAddress, ready bool) *discovery.En
|
||||
}
|
||||
|
||||
if address.NodeName != nil {
|
||||
endpoint.Topology = map[string]string{
|
||||
"kubernetes.io/hostname": *address.NodeName,
|
||||
}
|
||||
endpoint.NodeName = address.NodeName
|
||||
}
|
||||
if address.Hostname != "" {
|
||||
@ -163,29 +140,6 @@ func epPortsToEpsPorts(epPorts []corev1.EndpointPort) []discovery.EndpointPort {
|
||||
return epsPorts
|
||||
}
|
||||
|
||||
// boolPtrEqual returns true if a set of bool pointers have equivalent values.
|
||||
func boolPtrEqual(ptr1, ptr2 *bool) bool {
|
||||
if (ptr1 == nil) != (ptr2 == nil) {
|
||||
return false
|
||||
}
|
||||
if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// objectRefPtrEqual returns true if a set of object ref pointers have
|
||||
// equivalent values.
|
||||
func objectRefPtrEqual(ref1, ref2 *corev1.ObjectReference) bool {
|
||||
if (ref1 == nil) != (ref2 == nil) {
|
||||
return false
|
||||
}
|
||||
if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// getServiceFromDeleteAction parses a Service resource from a delete
|
||||
// action.
|
||||
func getServiceFromDeleteAction(obj interface{}) *corev1.Service {
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@ -97,9 +97,6 @@ func TestAddressToEndpoint(t *testing.T) {
|
||||
Conditions: discovery.EndpointConditions{
|
||||
Ready: utilpointer.BoolPtr(true),
|
||||
},
|
||||
Topology: map[string]string{
|
||||
"kubernetes.io/hostname": "node-abc",
|
||||
},
|
||||
TargetRef: &v1.ObjectReference{
|
||||
APIVersion: "v1",
|
||||
Kind: "Pod",
|
||||
|
@ -25,7 +25,8 @@ import (
|
||||
"sync"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
@ -275,3 +276,60 @@ func (sl portsInOrder) Less(i, j int) bool {
|
||||
h2 := DeepHashObjectToString(sl[j])
|
||||
return h1 < h2
|
||||
}
|
||||
|
||||
// endpointsEqualBeyondHash returns true if endpoints have equal attributes
|
||||
// but excludes equality checks that would have already been covered with
|
||||
// endpoint hashing (see hashEndpoint func for more info).
|
||||
func EndpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool {
|
||||
if stringPtrChanged(ep1.NodeName, ep1.NodeName) {
|
||||
return false
|
||||
}
|
||||
|
||||
if stringPtrChanged(ep1.Zone, ep1.Zone) {
|
||||
return false
|
||||
}
|
||||
|
||||
if boolPtrChanged(ep1.Conditions.Ready, ep2.Conditions.Ready) {
|
||||
return false
|
||||
}
|
||||
|
||||
if objectRefPtrChanged(ep1.TargetRef, ep2.TargetRef) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// boolPtrChanged returns true if a set of bool pointers have different values.
|
||||
func boolPtrChanged(ptr1, ptr2 *bool) bool {
|
||||
if (ptr1 == nil) != (ptr2 == nil) {
|
||||
return true
|
||||
}
|
||||
if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// objectRefPtrChanged returns true if a set of object ref pointers have
|
||||
// different values.
|
||||
func objectRefPtrChanged(ref1, ref2 *v1.ObjectReference) bool {
|
||||
if (ref1 == nil) != (ref2 == nil) {
|
||||
return true
|
||||
}
|
||||
if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// stringPtrChanged returns true if a set of string pointers have different values.
|
||||
func stringPtrChanged(ptr1, ptr2 *string) bool {
|
||||
if (ptr1 == nil) != (ptr2 == nil) {
|
||||
return true
|
||||
}
|
||||
if ptr1 != nil && ptr2 != nil && *ptr1 != *ptr2 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -91,7 +91,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
|
||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||
"k8s.io/kubernetes/pkg/controlplane/tunneler"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/routes"
|
||||
@ -250,10 +249,7 @@ type Instance struct {
|
||||
|
||||
func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
|
||||
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
var endpointSliceClient discoveryclient.EndpointSlicesGetter
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
}
|
||||
endpointSliceClient := discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
|
||||
|
||||
return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointsAdapter)
|
||||
@ -265,10 +261,7 @@ func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
|
||||
|
||||
func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
|
||||
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
var endpointSliceClient discoveryclient.EndpointSlicesGetter
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
}
|
||||
endpointSliceClient := discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
|
||||
|
||||
ttl := c.ExtraConfig.MasterEndpointReconcileTTL
|
||||
|
@ -466,6 +466,8 @@ const (
|
||||
|
||||
// owner: @robscott @freehan
|
||||
// alpha: v1.16
|
||||
// beta: v1.18
|
||||
// ga: v1.21
|
||||
//
|
||||
// Enable Endpoint Slices for more scalable Service endpoints.
|
||||
EndpointSlice featuregate.Feature = "EndpointSlice"
|
||||
@ -775,7 +777,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
NonPreemptingPriority: {Default: true, PreRelease: featuregate.Beta},
|
||||
PodOverhead: {Default: true, PreRelease: featuregate.Beta},
|
||||
IPv6DualStack: {Default: true, PreRelease: featuregate.Beta},
|
||||
EndpointSlice: {Default: true, PreRelease: featuregate.Beta},
|
||||
EndpointSlice: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25
|
||||
EndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta},
|
||||
EndpointSliceTerminatingCondition: {Default: false, PreRelease: featuregate.Alpha},
|
||||
EndpointSliceNodeName: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, //remove in 1.25
|
||||
|
@ -76,14 +76,6 @@ func Validate(config *kubeproxyconfig.KubeProxyConfiguration) field.ErrorList {
|
||||
allErrs = append(allErrs, validateHostPort(config.MetricsBindAddress, newPath.Child("MetricsBindAddress"))...)
|
||||
|
||||
dualStackEnabled := effectiveFeatures.Enabled(kubefeatures.IPv6DualStack)
|
||||
endpointSliceEnabled := effectiveFeatures.Enabled(kubefeatures.EndpointSlice)
|
||||
|
||||
// dual stack has strong dependency on endpoint slice since
|
||||
// endpoint slice controller is the only capabable of producing
|
||||
// slices for *all* clusterIPs
|
||||
if dualStackEnabled && !endpointSliceEnabled {
|
||||
allErrs = append(allErrs, field.Invalid(newPath.Child("FeatureGates"), config.FeatureGates, "EndpointSlice feature flag must be turned on when turning on DualStack"))
|
||||
}
|
||||
|
||||
if config.ClusterCIDR != "" {
|
||||
cidrs := strings.Split(config.ClusterCIDR, ",")
|
||||
|
@ -122,7 +122,7 @@ func TestValidateKubeProxyConfiguration(t *testing.T) {
|
||||
BindAddress: "10.10.12.11",
|
||||
HealthzBindAddress: "0.0.0.0:12345",
|
||||
MetricsBindAddress: "127.0.0.1:10249",
|
||||
FeatureGates: map[string]bool{"IPv6DualStack": true, "EndpointSlice": true},
|
||||
FeatureGates: map[string]bool{"IPv6DualStack": true},
|
||||
ClusterCIDR: "192.168.59.0/24",
|
||||
UDPIdleTimeout: metav1.Duration{Duration: 1 * time.Second},
|
||||
ConfigSyncPeriod: metav1.Duration{Duration: 1 * time.Second},
|
||||
@ -285,7 +285,7 @@ func TestValidateKubeProxyConfiguration(t *testing.T) {
|
||||
HealthzBindAddress: "0.0.0.0:12345",
|
||||
MetricsBindAddress: "127.0.0.1:10249",
|
||||
// DualStack ClusterCIDR without feature flag enabled
|
||||
FeatureGates: map[string]bool{"IPv6DualStack": false, "EndpointSlice": false},
|
||||
FeatureGates: map[string]bool{"IPv6DualStack": false},
|
||||
ClusterCIDR: "192.168.59.0/24,fd00:192:168::/64",
|
||||
UDPIdleTimeout: metav1.Duration{Duration: 1 * time.Second},
|
||||
ConfigSyncPeriod: metav1.Duration{Duration: 1 * time.Second},
|
||||
@ -303,36 +303,12 @@ func TestValidateKubeProxyConfiguration(t *testing.T) {
|
||||
},
|
||||
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("ClusterCIDR"), "192.168.59.0/24,fd00:192:168::/64", "only one CIDR allowed (e.g. 10.100.0.0/16 or fde4:8dba:82e1::/48)")},
|
||||
},
|
||||
"DualStack feature-enabled but EndpointSlice feature disabled": {
|
||||
config: kubeproxyconfig.KubeProxyConfiguration{
|
||||
BindAddress: "10.10.12.11",
|
||||
HealthzBindAddress: "0.0.0.0:12345",
|
||||
MetricsBindAddress: "127.0.0.1:10249",
|
||||
// DualStack ClusterCIDR with feature flag enabled but EndpointSlice is not enabled
|
||||
FeatureGates: map[string]bool{"IPv6DualStack": true, "EndpointSlice": false},
|
||||
ClusterCIDR: "192.168.59.0/24,fd00:192:168::/64",
|
||||
UDPIdleTimeout: metav1.Duration{Duration: 1 * time.Second},
|
||||
ConfigSyncPeriod: metav1.Duration{Duration: 1 * time.Second},
|
||||
IPTables: kubeproxyconfig.KubeProxyIPTablesConfiguration{
|
||||
MasqueradeAll: true,
|
||||
SyncPeriod: metav1.Duration{Duration: 5 * time.Second},
|
||||
MinSyncPeriod: metav1.Duration{Duration: 2 * time.Second},
|
||||
},
|
||||
Conntrack: kubeproxyconfig.KubeProxyConntrackConfiguration{
|
||||
MaxPerCore: pointer.Int32Ptr(1),
|
||||
Min: pointer.Int32Ptr(1),
|
||||
TCPEstablishedTimeout: &metav1.Duration{Duration: 5 * time.Second},
|
||||
TCPCloseWaitTimeout: &metav1.Duration{Duration: 5 * time.Second},
|
||||
},
|
||||
},
|
||||
expectedErrs: field.ErrorList{field.Invalid(newPath.Child("FeatureGates"), map[string]bool{"EndpointSlice": false, "IPv6DualStack": true}, "EndpointSlice feature flag must be turned on when turning on DualStack")},
|
||||
},
|
||||
"Invalid number of ClusterCIDRs": {
|
||||
config: kubeproxyconfig.KubeProxyConfiguration{
|
||||
BindAddress: "10.10.12.11",
|
||||
HealthzBindAddress: "0.0.0.0:12345",
|
||||
MetricsBindAddress: "127.0.0.1:10249",
|
||||
FeatureGates: map[string]bool{"IPv6DualStack": true, "EndpointSlice": true},
|
||||
FeatureGates: map[string]bool{"IPv6DualStack": true},
|
||||
ClusterCIDR: "192.168.59.0/24,fd00:192:168::/64,10.0.0.0/16",
|
||||
UDPIdleTimeout: metav1.Duration{Duration: 1 * time.Second},
|
||||
ConfigSyncPeriod: metav1.Duration{Duration: 1 * time.Second},
|
||||
|
@ -149,35 +149,33 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
|
||||
},
|
||||
})
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "endpointslice-controller"},
|
||||
Rules: []rbacv1.PolicyRule{
|
||||
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("services", "pods", "nodes").RuleOrDie(),
|
||||
// The controller needs to be able to set a service's finalizers to be able to create an EndpointSlice
|
||||
// resource that is owned by the service and sets blockOwnerDeletion=true in its ownerRef.
|
||||
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("services/finalizers").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "create", "update", "delete").Groups(discoveryGroup).Resources("endpointslices").RuleOrDie(),
|
||||
eventsRule(),
|
||||
},
|
||||
})
|
||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "endpointslice-controller"},
|
||||
Rules: []rbacv1.PolicyRule{
|
||||
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("services", "pods", "nodes").RuleOrDie(),
|
||||
// The controller needs to be able to set a service's finalizers to be able to create an EndpointSlice
|
||||
// resource that is owned by the service and sets blockOwnerDeletion=true in its ownerRef.
|
||||
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("services/finalizers").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "create", "update", "delete").Groups(discoveryGroup).Resources("endpointslices").RuleOrDie(),
|
||||
eventsRule(),
|
||||
},
|
||||
})
|
||||
|
||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "endpointslicemirroring-controller"},
|
||||
Rules: []rbacv1.PolicyRule{
|
||||
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("services", "endpoints").RuleOrDie(),
|
||||
// The controller needs to be able to set a service's finalizers to be able to create an EndpointSlice
|
||||
// resource that is owned by the service and sets blockOwnerDeletion=true in its ownerRef.
|
||||
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("services/finalizers").RuleOrDie(),
|
||||
// The controller needs to be able to set a service's finalizers to be able to create an EndpointSlice
|
||||
// resource that is owned by the endpoint and sets blockOwnerDeletion=true in its ownerRef.
|
||||
// see https://github.com/openshift/kubernetes/blob/8691466059314c3f7d6dcffcbb76d14596ca716c/pkg/controller/endpointslicemirroring/utils.go#L87-L88
|
||||
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("endpoints/finalizers").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "create", "update", "delete").Groups(discoveryGroup).Resources("endpointslices").RuleOrDie(),
|
||||
eventsRule(),
|
||||
},
|
||||
})
|
||||
}
|
||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "endpointslicemirroring-controller"},
|
||||
Rules: []rbacv1.PolicyRule{
|
||||
rbacv1helpers.NewRule("get", "list", "watch").Groups(legacyGroup).Resources("services", "endpoints").RuleOrDie(),
|
||||
// The controller needs to be able to set a service's finalizers to be able to create an EndpointSlice
|
||||
// resource that is owned by the service and sets blockOwnerDeletion=true in its ownerRef.
|
||||
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("services/finalizers").RuleOrDie(),
|
||||
// The controller needs to be able to set a service's finalizers to be able to create an EndpointSlice
|
||||
// resource that is owned by the endpoint and sets blockOwnerDeletion=true in its ownerRef.
|
||||
// see https://github.com/openshift/kubernetes/blob/8691466059314c3f7d6dcffcbb76d14596ca716c/pkg/controller/endpointslicemirroring/utils.go#L87-L88
|
||||
rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("endpoints/finalizers").RuleOrDie(),
|
||||
rbacv1helpers.NewRule("get", "list", "create", "update", "delete").Groups(discoveryGroup).Resources("endpointslices").RuleOrDie(),
|
||||
eventsRule(),
|
||||
},
|
||||
})
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ExpandPersistentVolumes) {
|
||||
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
|
||||
|
@ -512,9 +512,7 @@ func ClusterRoles() []rbacv1.ClusterRole {
|
||||
|
||||
eventsRule(),
|
||||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
nodeProxierRules = append(nodeProxierRules, rbacv1helpers.NewRule("list", "watch").Groups(discoveryGroup).Resources("endpointslices").RuleOrDie())
|
||||
}
|
||||
nodeProxierRules = append(nodeProxierRules, rbacv1helpers.NewRule("list", "watch").Groups(discoveryGroup).Resources("endpointslices").RuleOrDie())
|
||||
roles = append(roles, rbacv1.ClusterRole{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "system:node-proxier"},
|
||||
Rules: nodeProxierRules,
|
||||
|
@ -115,7 +115,7 @@ func TestDualStackEndpoints(t *testing.T) {
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().Services(),
|
||||
informers.Core().V1().Nodes(),
|
||||
informers.Discovery().V1beta1().EndpointSlices(),
|
||||
informers.Discovery().V1().EndpointSlices(),
|
||||
int32(100),
|
||||
client,
|
||||
1*time.Second)
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
@ -60,14 +60,14 @@ func TestEndpointSliceMirroring(t *testing.T) {
|
||||
informers.Core().V1().Pods(),
|
||||
informers.Core().V1().Services(),
|
||||
informers.Core().V1().Nodes(),
|
||||
informers.Discovery().V1beta1().EndpointSlices(),
|
||||
informers.Discovery().V1().EndpointSlices(),
|
||||
int32(100),
|
||||
client,
|
||||
1*time.Second)
|
||||
|
||||
epsmController := endpointslicemirroring.NewController(
|
||||
informers.Core().V1().Endpoints(),
|
||||
informers.Discovery().V1beta1().EndpointSlices(),
|
||||
informers.Discovery().V1().EndpointSlices(),
|
||||
informers.Core().V1().Services(),
|
||||
int32(100),
|
||||
client,
|
||||
|
Loading…
Reference in New Issue
Block a user