Merge pull request #103451 from swetharepakula/ga-proxy-gates

Graduate EndpointSliceProxying and WindowsEndpointSliceProxying Gates
This commit is contained in:
Kubernetes Prow Robot 2021-07-07 18:09:13 -07:00 committed by GitHub
commit 8fb777efb0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1498 additions and 1850 deletions

View File

@ -364,7 +364,7 @@ func newProxyServer(
} }
} }
useEndpointSlices := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) useEndpointSlices := true
if proxyMode == proxyModeUserspace { if proxyMode == proxyModeUserspace {
// userspace mode doesn't support endpointslice. // userspace mode doesn't support endpointslice.
useEndpointSlices = false useEndpointSlices = false

View File

@ -160,7 +160,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
return nil, fmt.Errorf("unable to create proxier: %v", err) return nil, fmt.Errorf("unable to create proxier: %v", err)
} }
} }
useEndpointSlices := utilfeature.DefaultFeatureGate.Enabled(features.WindowsEndpointSliceProxying) useEndpointSlices := true
if proxyMode == proxyModeUserspace { if proxyMode == proxyModeUserspace {
// userspace mode doesn't support endpointslice. // userspace mode doesn't support endpointslice.
useEndpointSlices = false useEndpointSlices = false

View File

@ -426,6 +426,7 @@ const (
// kep: http://kep.k8s.io/752 // kep: http://kep.k8s.io/752
// alpha: v1.18 // alpha: v1.18
// beta: v1.19 // beta: v1.19
// ga: v1.22
// //
// Enable Endpoint Slice consumption by kube-proxy for improved scalability. // Enable Endpoint Slice consumption by kube-proxy for improved scalability.
EndpointSliceProxying featuregate.Feature = "EndpointSliceProxying" EndpointSliceProxying featuregate.Feature = "EndpointSliceProxying"
@ -434,6 +435,7 @@ const (
// kep: http://kep.k8s.io/752 // kep: http://kep.k8s.io/752
// alpha: v1.19 // alpha: v1.19
// beta: v1.21 // beta: v1.21
// ga: v1.22
// //
// Enable Endpoint Slice consumption by kube-proxy in Windows for improved scalability. // Enable Endpoint Slice consumption by kube-proxy in Windows for improved scalability.
WindowsEndpointSliceProxying featuregate.Feature = "WindowsEndpointSliceProxying" WindowsEndpointSliceProxying featuregate.Feature = "WindowsEndpointSliceProxying"
@ -834,11 +836,11 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
PodOverhead: {Default: true, PreRelease: featuregate.Beta}, PodOverhead: {Default: true, PreRelease: featuregate.Beta},
IPv6DualStack: {Default: true, PreRelease: featuregate.Beta}, IPv6DualStack: {Default: true, PreRelease: featuregate.Beta},
EndpointSlice: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25 EndpointSlice: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25
EndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta}, EndpointSliceProxying: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25
EndpointSliceTerminatingCondition: {Default: false, PreRelease: featuregate.Alpha}, EndpointSliceTerminatingCondition: {Default: false, PreRelease: featuregate.Alpha},
ProxyTerminatingEndpoints: {Default: false, PreRelease: featuregate.Alpha}, ProxyTerminatingEndpoints: {Default: false, PreRelease: featuregate.Alpha},
EndpointSliceNodeName: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, //remove in 1.25 EndpointSliceNodeName: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, //remove in 1.25
WindowsEndpointSliceProxying: {Default: true, PreRelease: featuregate.Beta}, WindowsEndpointSliceProxying: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25
StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23 StartupProbe: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23
AllowInsecureBackendProxy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23 AllowInsecureBackendProxy: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.23
PodDisruptionBudget: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25 PodDisruptionBudget: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.25

View File

@ -182,8 +182,8 @@ type EndpointChangeTracker struct {
} }
// NewEndpointChangeTracker initializes an EndpointsChangeMap // NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, endpointSlicesEnabled bool, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker { func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, ipFamily v1.IPFamily, recorder events.EventRecorder, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
ect := &EndpointChangeTracker{ return &EndpointChangeTracker{
hostname: hostname, hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange), items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo, makeEndpointInfo: makeEndpointInfo,
@ -192,11 +192,8 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: time.Now(), trackerStartTime: time.Now(),
processEndpointsMapChange: processEndpointsMapChange, processEndpointsMapChange: processEndpointsMapChange,
endpointSliceCache: NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo),
} }
if endpointSlicesEnabled {
ect.endpointSliceCache = NewEndpointSliceCache(hostname, ipFamily, recorder, makeEndpointInfo)
}
return ect
} }
// Update updates given service's endpoints change map based on the <previous, current> endpoints pair. It returns true // Update updates given service's endpoints change map based on the <previous, current> endpoints pair. It returns true

View File

@ -452,7 +452,7 @@ func TestEndpointsToEndpointsMap(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) { t.Run(tc.desc, func(t *testing.T) {
epTracker := NewEndpointChangeTracker("test-hostname", nil, tc.ipFamily, nil, false, nil) epTracker := NewEndpointChangeTracker("test-hostname", nil, tc.ipFamily, nil, nil)
// outputs // outputs
newEndpoints := epTracker.endpointsToEndpointsMap(tc.newEndpoints) newEndpoints := epTracker.endpointsToEndpointsMap(tc.newEndpoints)
@ -1531,7 +1531,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
// test starting from an empty state // test starting from an empty state
"add a simple slice that doesn't already exist": { "add a simple slice that doesn't already exist": {
startingSlices: []*discovery.EndpointSlice{}, startingSlices: []*discovery.EndpointSlice{},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1554,7 +1554,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1566,7 +1566,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice, paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1579,7 +1579,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1611,7 +1611,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}), paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1641,7 +1641,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true, paramRemoveSlice: true,
@ -1663,7 +1663,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true, paramRemoveSlice: true,
@ -1675,7 +1675,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1698,7 +1698,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 2, 1, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1720,7 +1720,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1748,7 +1748,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1805,64 +1805,31 @@ func TestCheckoutChanges(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
endpointChangeTracker *EndpointChangeTracker endpointChangeTracker *EndpointChangeTracker
expectedChanges []*endpointsChange expectedChanges []*endpointsChange
useEndpointSlices bool
items map[types.NamespacedName]*endpointsChange items map[types.NamespacedName]*endpointsChange
appliedSlices []*discovery.EndpointSlice appliedSlices []*discovery.EndpointSlice
pendingSlices []*discovery.EndpointSlice pendingSlices []*discovery.EndpointSlice
}{ }{
"empty slices": { "empty slices": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
expectedChanges: []*endpointsChange{}, expectedChanges: []*endpointsChange{},
useEndpointSlices: true,
appliedSlices: []*discovery.EndpointSlice{}, appliedSlices: []*discovery.EndpointSlice{},
pendingSlices: []*discovery.EndpointSlice{}, pendingSlices: []*discovery.EndpointSlice{},
}, },
"without slices, empty items": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, false, nil),
expectedChanges: []*endpointsChange{},
items: map[types.NamespacedName]*endpointsChange{},
useEndpointSlices: false,
},
"without slices, simple items": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, false, nil),
expectedChanges: []*endpointsChange{{
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "", true, true, false), newTestEp("10.0.1.2:80", "", true, true, false)},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", "", true, true, false), newTestEp("10.0.1.2:443", "", true, true, false)},
},
current: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "", true, true, false), newTestEp("10.0.1.2:80", "", true, true, false)},
},
}},
items: map[types.NamespacedName]*endpointsChange{
{Namespace: "ns1", Name: "svc1"}: {
previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "", true, true, false), newTestEp("10.0.1.2:80", "", true, true, false)},
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443", "", true, true, false), newTestEp("10.0.1.2:443", "", true, true, false)},
},
current: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "", true, true, false), newTestEp("10.0.1.2:80", "", true, true, false)},
},
},
},
useEndpointSlices: false,
},
"adding initial slice": { "adding initial slice": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
expectedChanges: []*endpointsChange{{ expectedChanges: []*endpointsChange{{
previous: EndpointsMap{}, previous: EndpointsMap{},
current: EndpointsMap{ current: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", false, true, true), newTestEp("10.0.1.3:80", "host1", false, false, false)}, svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", false, true, true), newTestEp("10.0.1.3:80", "host1", false, false, false)},
}, },
}}, }},
useEndpointSlices: true, appliedSlices: []*discovery.EndpointSlice{},
appliedSlices: []*discovery.EndpointSlice{},
pendingSlices: []*discovery.EndpointSlice{ pendingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}), generateEndpointSlice("svc1", "ns1", 1, 3, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
}, },
}, },
"removing port in update": { "removing port in update": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, true, nil), endpointChangeTracker: NewEndpointChangeTracker("", nil, v1.IPv4Protocol, nil, nil),
expectedChanges: []*endpointsChange{{ expectedChanges: []*endpointsChange{{
previous: EndpointsMap{ previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)}, svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)},
@ -1872,7 +1839,6 @@ func TestCheckoutChanges(t *testing.T) {
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)}, svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1", true, true, false), newTestEp("10.0.1.2:80", "host1", true, true, false), newTestEp("10.0.1.3:80", "host1", false, false, false)},
}, },
}}, }},
useEndpointSlices: true,
appliedSlices: []*discovery.EndpointSlice{ appliedSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 3, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
@ -1884,18 +1850,13 @@ func TestCheckoutChanges(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
if tc.useEndpointSlices { for _, slice := range tc.appliedSlices {
for _, slice := range tc.appliedSlices { tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false) }
} tc.endpointChangeTracker.checkoutChanges()
tc.endpointChangeTracker.checkoutChanges() for _, slice := range tc.pendingSlices {
for _, slice := range tc.pendingSlices { tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
}
} else {
tc.endpointChangeTracker.items = tc.items
} }
changes := tc.endpointChangeTracker.checkoutChanges() changes := tc.endpointChangeTracker.checkoutChanges()
if len(tc.expectedChanges) != len(changes) { if len(tc.expectedChanges) != len(changes) {

View File

@ -191,10 +191,9 @@ type Proxier struct {
endpointsMap proxy.EndpointsMap endpointsMap proxy.EndpointsMap
portsMap map[utilnet.LocalPort]utilnet.Closeable portsMap map[utilnet.LocalPort]utilnet.Closeable
nodeLabels map[string]string nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true // endpointSlicesSynced, and servicesSynced are set to true
// when corresponding objects are synced after startup. This is used to avoid // when corresponding objects are synced after startup. This is used to avoid
// updating iptables with some partial data after kube-proxy restart. // updating iptables with some partial data after kube-proxy restart.
endpointsSynced bool
endpointSlicesSynced bool endpointSlicesSynced bool
servicesSynced bool servicesSynced bool
initialized int32 initialized int32
@ -281,8 +280,6 @@ func NewProxier(ipt utiliptables.Interface,
masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue) masqueradeMark := fmt.Sprintf("%#08x", masqueradeValue)
klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark) klog.V(2).InfoS("Using iptables mark for masquerade", "ipFamily", ipt.Protocol(), "mark", masqueradeMark)
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
ipFamily := v1.IPv4Protocol ipFamily := v1.IPv4Protocol
@ -302,7 +299,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, endpointSlicesEnabled, nil), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, ipFamily, recorder, nil),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
iptables: ipt, iptables: ipt,
masqueradeAll: masqueradeAll, masqueradeAll: masqueradeAll,
@ -575,48 +572,31 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() { func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock() proxier.mu.Lock()
proxier.servicesSynced = true proxier.servicesSynced = true
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { proxier.setInitialized(proxier.endpointSlicesSynced)
proxier.setInitialized(proxier.endpointSlicesSynced)
} else {
proxier.setInitialized(proxier.endpointsSynced)
}
proxier.mu.Unlock() proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime. // Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules() proxier.syncProxyRules()
} }
// iptables proxier only uses EndpointSlice, the following methods
// exist to implement the Proxier interface but are noops
// OnEndpointsAdd is called whenever creation of new endpoints object // OnEndpointsAdd is called whenever creation of new endpoints object
// is observed. // is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {}
proxier.OnEndpointsUpdate(nil, endpoints)
}
// OnEndpointsUpdate is called whenever modification of an existing // OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed. // endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {}
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointsDelete is called whenever deletion of an existing endpoints // OnEndpointsDelete is called whenever deletion of an existing endpoints
// object is observed. // object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {}
proxier.OnEndpointsUpdate(endpoints, nil)
}
// OnEndpointsSynced is called once all the initial event handlers were // OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache. // called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointsSynced() { func (proxier *Proxier) OnEndpointsSynced() {}
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed. // is observed.

File diff suppressed because it is too large Load Diff

View File

@ -221,10 +221,9 @@ type Proxier struct {
endpointsMap proxy.EndpointsMap endpointsMap proxy.EndpointsMap
portsMap map[utilnet.LocalPort]utilnet.Closeable portsMap map[utilnet.LocalPort]utilnet.Closeable
nodeLabels map[string]string nodeLabels map[string]string
// endpointsSynced, endpointSlicesSynced, and servicesSynced are set to true when // endpointSlicesSynced, and servicesSynced are set to true when
// corresponding objects are synced after startup. This is used to avoid updating // corresponding objects are synced after startup. This is used to avoid updating
// ipvs rules with some partial data after kube-proxy restart. // ipvs rules with some partial data after kube-proxy restart.
endpointsSynced bool
endpointSlicesSynced bool endpointSlicesSynced bool
servicesSynced bool servicesSynced bool
initialized int32 initialized int32
@ -438,8 +437,6 @@ func NewProxier(ipt utiliptables.Interface,
serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder)
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying)
ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses) ipFamilyMap := utilproxy.MapCIDRsByIPFamily(nodePortAddresses)
nodePortAddresses = ipFamilyMap[ipFamily] nodePortAddresses = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily // Log the IPs not matching the ipFamily
@ -456,7 +453,7 @@ func NewProxier(ipt utiliptables.Interface,
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, ipFamily, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, endpointSlicesEnabled, nil), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, ipFamily, recorder, nil),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod, minSyncPeriod: minSyncPeriod,
excludeCIDRs: parsedExcludeCIDRs, excludeCIDRs: parsedExcludeCIDRs,
@ -881,44 +878,27 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() { func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock() proxier.mu.Lock()
proxier.servicesSynced = true proxier.servicesSynced = true
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { proxier.setInitialized(proxier.endpointSlicesSynced)
proxier.setInitialized(proxier.endpointSlicesSynced)
} else {
proxier.setInitialized(proxier.endpointsSynced)
}
proxier.mu.Unlock() proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime. // Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules() proxier.syncProxyRules()
} }
// The following methods exist to implement the Proxier interface however
// ipvs proxier only uses EndpointSlices so the following are noops
// OnEndpointsAdd is called whenever creation of new endpoints object is observed. // OnEndpointsAdd is called whenever creation of new endpoints object is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {}
proxier.OnEndpointsUpdate(nil, endpoints)
}
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed. // OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {}
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed. // OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {}
proxier.OnEndpointsUpdate(endpoints, nil)
}
// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache. // OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointsSynced() { func (proxier *Proxier) OnEndpointsSynced() {}
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed. // is observed.

File diff suppressed because it is too large Load Diff

View File

@ -27,7 +27,7 @@ import (
// labels, and enabled feature gates. This is primarily used to enable topology // labels, and enabled feature gates. This is primarily used to enable topology
// aware routing. // aware routing.
func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) []Endpoint { func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) []Endpoint {
if svcInfo.NodeLocalExternal() || !utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying) { if svcInfo.NodeLocalExternal() {
return endpoints return endpoints
} }

View File

@ -33,19 +33,17 @@ func TestFilterEndpoints(t *testing.T) {
zoneHints sets.String zoneHints sets.String
} }
testCases := []struct { testCases := []struct {
name string name string
epsProxyingEnabled bool hintsEnabled bool
hintsEnabled bool nodeLabels map[string]string
nodeLabels map[string]string serviceInfo ServicePort
serviceInfo ServicePort endpoints []endpoint
endpoints []endpoint expectedEndpoints []endpoint
expectedEndpoints []endpoint
}{{ }{{
name: "hints + eps proxying enabled, hints annotation == auto", name: "hints enabled, hints annotation == auto",
hintsEnabled: true, hintsEnabled: true,
epsProxyingEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "auto"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "auto"},
endpoints: []endpoint{ endpoints: []endpoint{
{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")},
{ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")},
@ -57,11 +55,10 @@ func TestFilterEndpoints(t *testing.T) {
{ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")},
}, },
}, { }, {
name: "hints + eps proxying enabled, hints annotation == disabled, hints ignored", name: "hints, hints annotation == disabled, hints ignored",
hintsEnabled: true, hintsEnabled: true,
epsProxyingEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "disabled"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "disabled"},
endpoints: []endpoint{ endpoints: []endpoint{
{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")},
{ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")},
@ -75,11 +72,10 @@ func TestFilterEndpoints(t *testing.T) {
{ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")},
}, },
}, { }, {
name: "hints + eps proxying enabled, hints annotation == aUto (wrong capitalization), hints ignored", name: "hints, hints annotation == aUto (wrong capitalization), hints ignored",
hintsEnabled: true, hintsEnabled: true,
epsProxyingEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "aUto"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "aUto"},
endpoints: []endpoint{ endpoints: []endpoint{
{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")},
{ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")},
@ -93,11 +89,10 @@ func TestFilterEndpoints(t *testing.T) {
{ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")},
}, },
}, { }, {
name: "hints + eps proxying enabled, hints annotation empty, hints ignored", name: "hints, hints annotation empty, hints ignored",
hintsEnabled: true, hintsEnabled: true,
epsProxyingEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, serviceInfo: &BaseServiceInfo{nodeLocalExternal: false},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false},
endpoints: []endpoint{ endpoints: []endpoint{
{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")},
{ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")},
@ -111,29 +106,10 @@ func TestFilterEndpoints(t *testing.T) {
{ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")},
}, },
}, { }, {
name: "hints enabled, eps proxying not, hints are ignored", name: "node local endpoints, hints are ignored",
hintsEnabled: true, hintsEnabled: true,
epsProxyingEnabled: false, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, serviceInfo: &BaseServiceInfo{nodeLocalExternal: true},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false},
endpoints: []endpoint{
{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")},
{ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")},
{ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")},
{ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")},
},
expectedEndpoints: []endpoint{
{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")},
{ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")},
{ip: "10.1.2.5", zoneHints: sets.NewString("zone-c")},
{ip: "10.1.2.6", zoneHints: sets.NewString("zone-a")},
},
}, {
name: "node local endpoints, hints are ignored",
hintsEnabled: true,
epsProxyingEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: true},
endpoints: []endpoint{ endpoints: []endpoint{
{ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")}, {ip: "10.1.2.3", zoneHints: sets.NewString("zone-a")},
{ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")}, {ip: "10.1.2.4", zoneHints: sets.NewString("zone-b")},
@ -158,7 +134,6 @@ func TestFilterEndpoints(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.EndpointSliceProxying, tc.epsProxyingEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)()
endpoints := []Endpoint{} endpoints := []Endpoint{}

View File

@ -43,7 +43,6 @@ import (
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/apis/config" "k8s.io/kubernetes/pkg/proxy/apis/config"
@ -458,10 +457,9 @@ type Proxier struct {
mu sync.Mutex // protects the following fields mu sync.Mutex // protects the following fields
serviceMap proxy.ServiceMap serviceMap proxy.ServiceMap
endpointsMap proxy.EndpointsMap endpointsMap proxy.EndpointsMap
// endpointsSynced and servicesSynced are set to true when corresponding // endpointSlicesSynced and servicesSynced are set to true when corresponding
// objects are synced after startup. This is used to avoid updating hns policies // objects are synced after startup. This is used to avoid updating hns policies
// with some partial data after kube-proxy restart. // with some partial data after kube-proxy restart.
endpointsSynced bool
endpointSlicesSynced bool endpointSlicesSynced bool
servicesSynced bool servicesSynced bool
isIPv6Mode bool isIPv6Mode bool
@ -627,7 +625,6 @@ func NewProxier(
} }
isIPv6 := utilnet.IsIPv6(nodeIP) isIPv6 := utilnet.IsIPv6(nodeIP)
endpointSlicesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.WindowsEndpointSliceProxying)
proxier := &Proxier{ proxier := &Proxier{
endPointsRefCount: make(endPointsReferenceCountMap), endPointsRefCount: make(endPointsReferenceCountMap),
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
@ -654,7 +651,7 @@ func NewProxier(
ipFamily = v1.IPv6Protocol ipFamily = v1.IPv6Protocol
} }
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange) serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, ipFamily, recorder, proxier.serviceMapChange)
endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, endpointSlicesEnabled, proxier.endpointsMapChange) endPointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, ipFamily, recorder, proxier.endpointsMapChange)
proxier.endpointsChanges = endPointChangeTracker proxier.endpointsChanges = endPointChangeTracker
proxier.serviceChanges = serviceChanges proxier.serviceChanges = serviceChanges
@ -830,11 +827,7 @@ func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
func (proxier *Proxier) OnServiceSynced() { func (proxier *Proxier) OnServiceSynced() {
proxier.mu.Lock() proxier.mu.Lock()
proxier.servicesSynced = true proxier.servicesSynced = true
if utilfeature.DefaultFeatureGate.Enabled(features.WindowsEndpointSliceProxying) { proxier.setInitialized(proxier.endpointSlicesSynced)
proxier.setInitialized(proxier.endpointSlicesSynced)
} else {
proxier.setInitialized(proxier.endpointsSynced)
}
proxier.mu.Unlock() proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime. // Sync unconditionally - this is called once per lifetime.
@ -855,38 +848,24 @@ func shouldSkipService(svcName types.NamespacedName, service *v1.Service) bool {
return false return false
} }
// The following methods exist to implement the proxier interface, however
// winkernel proxier only uses EndpointSlice, so the following are noops.
// OnEndpointsAdd is called whenever creation of new endpoints object // OnEndpointsAdd is called whenever creation of new endpoints object
// is observed. // is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {}
proxier.OnEndpointsUpdate(nil, endpoints)
}
// OnEndpointsUpdate is called whenever modification of an existing // OnEndpointsUpdate is called whenever modification of an existing
// endpoints object is observed. // endpoints object is observed.
func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {}
if proxier.endpointsChanges.Update(oldEndpoints, endpoints) && proxier.isInitialized() {
proxier.Sync()
}
}
// OnEndpointsDelete is called whenever deletion of an existing endpoints // OnEndpointsDelete is called whenever deletion of an existing endpoints
// object is observed. // object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) { func (proxier *Proxier) OnEndpointsDelete(endpoints *v1.Endpoints) {}
proxier.OnEndpointsUpdate(endpoints, nil)
}
// OnEndpointsSynced is called once all the initial event handlers were // OnEndpointsSynced is called once all the initial event handlers were
// called and the state is fully propagated to local cache. // called and the state is fully propagated to local cache.
func (proxier *Proxier) OnEndpointsSynced() { func (proxier *Proxier) OnEndpointsSynced() {}
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.setInitialized(proxier.servicesSynced && proxier.endpointsSynced)
proxier.mu.Unlock()
// Sync unconditionally - this is called once per lifetime.
proxier.syncProxyRules()
}
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object // OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
// is observed. // is observed.

View File

@ -111,7 +111,7 @@ func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
return nil return nil
} }
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier { func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier {
sourceVip := "192.168.1.2" sourceVip := "192.168.1.2"
hnsNetworkInfo := &hnsNetworkInfo{ hnsNetworkInfo := &hnsNetworkInfo{
id: strings.ToUpper(guid), id: strings.ToUpper(guid),
@ -134,7 +134,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
} }
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange) serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange)
endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, endpointSliceEnabled, proxier.endpointsMapChange) endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange)
proxier.endpointsChanges = endpointChangeTracker proxier.endpointsChanges = endpointChangeTracker
proxier.serviceChanges = serviceChanges proxier.serviceChanges = serviceChanges
@ -143,7 +143,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
func TestCreateServiceVip(t *testing.T) { func TestCreateServiceVip(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false) proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -178,7 +178,6 @@ func TestCreateServiceVip(t *testing.T) {
}} }}
}), }),
) )
makeEndpointsMap(proxier)
proxier.setInitialized(true) proxier.setInitialized(true)
proxier.syncProxyRules() proxier.syncProxyRules()
@ -199,7 +198,7 @@ func TestCreateServiceVip(t *testing.T) {
func TestCreateRemoteEndpointOverlay(t *testing.T) { func TestCreateRemoteEndpointOverlay(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false) proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -212,6 +211,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
Port: "p80", Port: "p80",
Protocol: v1.ProtocolTCP, Protocol: v1.ProtocolTCP,
} }
tcpProtocol := v1.ProtocolTCP
makeServiceMap(proxier, makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) { makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
@ -225,17 +225,16 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
}} }}
}), }),
) )
makeEndpointsMap(proxier, populateEndpointSlices(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName.Port, Name: utilpointer.StringPtr(svcPortName.Port),
Port: int32(svcPort), Port: utilpointer.Int32(int32(svcPort)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
) )
@ -264,18 +263,19 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
func TestCreateRemoteEndpointL2Bridge(t *testing.T) { func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false) proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge")
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
tcpProtocol := v1.ProtocolTCP
svcIP := "10.20.30.41" svcIP := "10.20.30.41"
svcPort := 80 svcPort := 80
svcNodePort := 3001 svcNodePort := 3001
svcPortName := proxy.ServicePortName{ svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"), NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80", Port: "p80",
Protocol: v1.ProtocolTCP, Protocol: tcpProtocol,
} }
makeServiceMap(proxier, makeServiceMap(proxier,
@ -285,22 +285,21 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
svc.Spec.Ports = []v1.ServicePort{{ svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port, Name: svcPortName.Port,
Port: int32(svcPort), Port: int32(svcPort),
Protocol: v1.ProtocolTCP, Protocol: tcpProtocol,
NodePort: int32(svcNodePort), NodePort: int32(svcNodePort),
}} }}
}), }),
) )
makeEndpointsMap(proxier, populateEndpointSlices(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName.Port, Name: utilpointer.String(svcPortName.Port),
Port: int32(svcPort), Port: utilpointer.Int32(int32(svcPort)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
) )
@ -327,7 +326,8 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
} }
func TestSharedRemoteEndpointDelete(t *testing.T) { func TestSharedRemoteEndpointDelete(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false) tcpProtocol := v1.ProtocolTCP
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge")
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -372,29 +372,27 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
}} }}
}), }),
) )
makeEndpointsMap(proxier, populateEndpointSlices(proxier,
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName1.Port, Name: utilpointer.StringPtr(svcPortName1.Port),
Port: int32(svcPort1), Port: utilpointer.Int32(int32(svcPort1)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName2.Namespace, svcPortName2.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName2.Port, Name: utilpointer.StringPtr(svcPortName2.Port),
Port: int32(svcPort2), Port: utilpointer.Int32(int32(svcPort2)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
) )
@ -433,17 +431,16 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
}), }),
) )
deleteEndpoints(proxier, deleteEndpointSlices(proxier,
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName2.Namespace, svcPortName2.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName2.Port, Name: utilpointer.StringPtr(svcPortName2.Port),
Port: int32(svcPort2), Port: utilpointer.Int32(int32(svcPort2)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
) )
@ -472,7 +469,7 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
} }
func TestSharedRemoteEndpointUpdate(t *testing.T) { func TestSharedRemoteEndpointUpdate(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false) proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge")
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -518,29 +515,28 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
}), }),
) )
makeEndpointsMap(proxier, tcpProtocol := v1.ProtocolTCP
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { populateEndpointSlices(proxier,
ept.Subsets = []v1.EndpointSubset{{ makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
Addresses: []v1.EndpointAddress{{ eps.AddressType = discovery.AddressTypeIPv4
IP: epIpAddressRemote, eps.Endpoints = []discovery.Endpoint{{
}}, Addresses: []string{epIpAddressRemote},
Ports: []v1.EndpointPort{{ }}
Name: svcPortName1.Port, eps.Ports = []discovery.EndpointPort{{
Port: int32(svcPort1), Name: utilpointer.StringPtr(svcPortName1.Port),
Protocol: v1.ProtocolTCP, Port: utilpointer.Int32(int32(svcPort1)),
}}, Protocol: &tcpProtocol,
}} }}
}), }),
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName2.Namespace, svcPortName2.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName2.Port, Name: utilpointer.StringPtr(svcPortName2.Port),
Port: int32(svcPort2), Port: utilpointer.Int32(int32(svcPort2)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
) )
@ -589,40 +585,37 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
}} }}
})) }))
proxier.OnEndpointsUpdate( proxier.OnEndpointSliceUpdate(
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName1.Port, Name: utilpointer.StringPtr(svcPortName1.Port),
Port: int32(svcPort1), Port: utilpointer.Int32(int32(svcPort1)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}},
Ports: []v1.EndpointPort{
{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
},
{
Name: "p443",
Port: int32(443),
Protocol: v1.ProtocolTCP,
}},
}} }}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName1.Port),
Port: utilpointer.Int32(int32(svcPort1)),
Protocol: &tcpProtocol,
},
{
Name: utilpointer.StringPtr("p443"),
Port: utilpointer.Int32(int32(443)),
Protocol: &tcpProtocol,
}}
})) }))
proxier.mu.Lock() proxier.mu.Lock()
proxier.endpointsSynced = true proxier.endpointSlicesSynced = true
proxier.mu.Unlock() proxier.mu.Unlock()
proxier.setInitialized(true) proxier.setInitialized(true)
@ -650,7 +643,8 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
} }
func TestCreateLoadBalancer(t *testing.T) { func TestCreateLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false) tcpProtocol := v1.ProtocolTCP
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -676,17 +670,16 @@ func TestCreateLoadBalancer(t *testing.T) {
}} }}
}), }),
) )
makeEndpointsMap(proxier, populateEndpointSlices(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
ept.Subsets = []v1.EndpointSubset{{ eps.AddressType = discovery.AddressTypeIPv4
Addresses: []v1.EndpointAddress{{ eps.Endpoints = []discovery.Endpoint{{
IP: epIpAddressRemote, Addresses: []string{epIpAddressRemote},
}}, }}
Ports: []v1.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
Name: svcPortName.Port, Name: utilpointer.StringPtr(svcPortName.Port),
Port: int32(svcPort), Port: utilpointer.Int32(int32(svcPort)),
Protocol: v1.ProtocolTCP, Protocol: &tcpProtocol,
}},
}} }}
}), }),
) )
@ -709,7 +702,7 @@ func TestCreateLoadBalancer(t *testing.T) {
func TestCreateDsrLoadBalancer(t *testing.T) { func TestCreateDsrLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false) proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -736,17 +729,17 @@ func TestCreateDsrLoadBalancer(t *testing.T) {
}} }}
}), }),
) )
makeEndpointsMap(proxier, tcpProtocol := v1.ProtocolTCP
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) { populateEndpointSlices(proxier,
ept.Subsets = []v1.EndpointSubset{{ makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
Addresses: []v1.EndpointAddress{{ eps.AddressType = discovery.AddressTypeIPv4
IP: epIpAddressRemote, eps.Endpoints = []discovery.Endpoint{{
}}, Addresses: []string{epIpAddressRemote},
Ports: []v1.EndpointPort{{ }}
Name: svcPortName.Port, eps.Ports = []discovery.EndpointPort{{
Port: int32(svcPort), Name: utilpointer.StringPtr(svcPortName.Port),
Protocol: v1.ProtocolTCP, Port: utilpointer.Int32(int32(svcPort)),
}}, Protocol: &tcpProtocol,
}} }}
}), }),
) )
@ -771,7 +764,7 @@ func TestCreateDsrLoadBalancer(t *testing.T) {
func TestEndpointSlice(t *testing.T) { func TestEndpointSlice(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, true) proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -908,33 +901,30 @@ func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Serv
return svc return svc
} }
func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) { func deleteEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) {
for i := range allEndpoints { for i := range allEndpointSlices {
proxier.OnEndpointsAdd(allEndpoints[i]) proxier.OnEndpointSliceDelete(allEndpointSlices[i])
} }
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.endpointsSynced = true proxier.endpointSlicesSynced = true
} }
func deleteEndpoints(proxier *Proxier, allEndpoints ...*v1.Endpoints) { func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) {
for i := range allEndpoints { for i := range allEndpointSlices {
proxier.OnEndpointsDelete(allEndpoints[i]) proxier.OnEndpointSliceAdd(allEndpointSlices[i])
} }
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
} }
func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints { func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*discovery.EndpointSlice)) *discovery.EndpointSlice {
ept := &v1.Endpoints{ eps := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: fmt.Sprintf("%s-%d", name, sliceNum),
Namespace: namespace, Namespace: namespace,
Labels: map[string]string{discovery.LabelServiceName: name},
}, },
} }
eptFunc(ept) epsFunc(eps)
return ept return eps
} }