Merge pull request #90909 from kumarvin123/feature/WindowsEpSlices

EndPointSlices implementation for Windows
This commit is contained in:
Kubernetes Prow Robot 2020-07-01 23:12:01 -07:00 committed by GitHub
commit 8623c26150
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 506 additions and 560 deletions

View File

@ -174,7 +174,7 @@ func newProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi
OOMScoreAdj: config.OOMScoreAdj, OOMScoreAdj: config.OOMScoreAdj,
ConfigSyncPeriod: config.ConfigSyncPeriod.Duration, ConfigSyncPeriod: config.ConfigSyncPeriod.Duration,
HealthzServer: healthzServer, HealthzServer: healthzServer,
UseEndpointSlices: false, UseEndpointSlices: utilfeature.DefaultFeatureGate.Enabled(features.EndpointSliceProxying),
}, nil }, nil
} }

View File

@ -94,6 +94,10 @@ func newBaseEndpointInfo(IP string, port int, isLocal bool, topology map[string]
type makeEndpointFunc func(info *BaseEndpointInfo) Endpoint type makeEndpointFunc func(info *BaseEndpointInfo) Endpoint
// This handler is invoked by the apply function on every change. This function should not modify the
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
type processEndpointsMapChangeFunc func(oldEndpointsMap, newEndpointsMap EndpointsMap)
// EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of // EndpointChangeTracker carries state about uncommitted changes to an arbitrary number of
// Endpoints, keyed by their namespace and name. // Endpoints, keyed by their namespace and name.
type EndpointChangeTracker struct { type EndpointChangeTracker struct {
@ -104,7 +108,8 @@ type EndpointChangeTracker struct {
// items maps a service to is endpointsChange. // items maps a service to is endpointsChange.
items map[types.NamespacedName]*endpointsChange items map[types.NamespacedName]*endpointsChange
// makeEndpointInfo allows proxier to inject customized information when processing endpoint. // makeEndpointInfo allows proxier to inject customized information when processing endpoint.
makeEndpointInfo makeEndpointFunc makeEndpointInfo makeEndpointFunc
processEndpointsMapChange processEndpointsMapChangeFunc
// endpointSliceCache holds a simplified version of endpoint slices. // endpointSliceCache holds a simplified version of endpoint slices.
endpointSliceCache *EndpointSliceCache endpointSliceCache *EndpointSliceCache
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
@ -116,14 +121,15 @@ type EndpointChangeTracker struct {
} }
// NewEndpointChangeTracker initializes an EndpointsChangeMap // NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool) *EndpointChangeTracker { func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder, endpointSlicesEnabled bool, processEndpointsMapChange processEndpointsMapChangeFunc) *EndpointChangeTracker {
ect := &EndpointChangeTracker{ ect := &EndpointChangeTracker{
hostname: hostname, hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange), items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo, makeEndpointInfo: makeEndpointInfo,
isIPv6Mode: isIPv6Mode, isIPv6Mode: isIPv6Mode,
recorder: recorder, recorder: recorder,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
processEndpointsMapChange: processEndpointsMapChange,
} }
if endpointSlicesEnabled { if endpointSlicesEnabled {
ect.endpointSliceCache = NewEndpointSliceCache(hostname, isIPv6Mode, recorder, makeEndpointInfo) ect.endpointSliceCache = NewEndpointSliceCache(hostname, isIPv6Mode, recorder, makeEndpointInfo)
@ -388,6 +394,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
// The changes map is cleared after applying them. // The changes map is cleared after applying them.
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints // In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
// that were changed and will result in syncing the proxy rules. // that were changed and will result in syncing the proxy rules.
// apply triggers processEndpointsMapChange on every change.
func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
if ect == nil { if ect == nil {
@ -396,6 +403,9 @@ func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]Servi
changes := ect.checkoutChanges() changes := ect.checkoutChanges()
for _, change := range changes { for _, change := range changes {
if ect.processEndpointsMapChange != nil {
ect.processEndpointsMapChange(change.previous, change.current)
}
em.unmerge(change.previous) em.unmerge(change.previous)
em.merge(change.current) em.merge(change.current)
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)

View File

@ -135,7 +135,7 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.
// This is a coarse test, but it offers some modicum of confidence as the code is evolved. // This is a coarse test, but it offers some modicum of confidence as the code is evolved.
func TestEndpointsToEndpointsMap(t *testing.T) { func TestEndpointsToEndpointsMap(t *testing.T) {
epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false) epTracker := NewEndpointChangeTracker("test-hostname", nil, nil, nil, false, nil)
trueVal := true trueVal := true
falseVal := false falseVal := false
@ -1446,7 +1446,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
// test starting from an empty state // test starting from an empty state
"add a simple slice that doesn't already exist": { "add a simple slice that doesn't already exist": {
startingSlices: []*discovery.EndpointSlice{}, startingSlices: []*discovery.EndpointSlice{},
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1469,7 +1469,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1481,7 +1481,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice, paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1494,7 +1494,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1526,7 +1526,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}), paramEndpointSlice: generateEndpointSliceWithOffset("svc1", "ns1", 3, 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1556,7 +1556,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true, paramRemoveSlice: true,
@ -1578,7 +1578,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: true, paramRemoveSlice: true,
@ -1590,7 +1590,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 1, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1602,7 +1602,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
startingSlices: []*discovery.EndpointSlice{ startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 2, 1, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 2, 1, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 2, 999, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1624,7 +1624,7 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 1, 3, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 2, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
}, },
endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("host1", nil, nil, nil, true, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
paramRemoveSlice: false, paramRemoveSlice: false,
@ -1683,20 +1683,20 @@ func TestCheckoutChanges(t *testing.T) {
pendingSlices []*discovery.EndpointSlice pendingSlices []*discovery.EndpointSlice
}{ }{
"empty slices": { "empty slices": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil),
expectedChanges: []*endpointsChange{}, expectedChanges: []*endpointsChange{},
useEndpointSlices: true, useEndpointSlices: true,
appliedSlices: []*discovery.EndpointSlice{}, appliedSlices: []*discovery.EndpointSlice{},
pendingSlices: []*discovery.EndpointSlice{}, pendingSlices: []*discovery.EndpointSlice{},
}, },
"without slices, empty items": { "without slices, empty items": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false), endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false, nil),
expectedChanges: []*endpointsChange{}, expectedChanges: []*endpointsChange{},
items: map[types.NamespacedName]*endpointsChange{}, items: map[types.NamespacedName]*endpointsChange{},
useEndpointSlices: false, useEndpointSlices: false,
}, },
"without slices, simple items": { "without slices, simple items": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false), endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false, nil),
expectedChanges: []*endpointsChange{{ expectedChanges: []*endpointsChange{{
previous: EndpointsMap{ previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")}, svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", ""), newTestEp("10.0.1.2:80", "")},
@ -1720,7 +1720,7 @@ func TestCheckoutChanges(t *testing.T) {
useEndpointSlices: false, useEndpointSlices: false,
}, },
"adding initial slice": { "adding initial slice": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil),
expectedChanges: []*endpointsChange{{ expectedChanges: []*endpointsChange{{
previous: EndpointsMap{}, previous: EndpointsMap{},
current: EndpointsMap{ current: EndpointsMap{
@ -1734,7 +1734,7 @@ func TestCheckoutChanges(t *testing.T) {
}, },
}, },
"removing port in update": { "removing port in update": {
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true, nil),
expectedChanges: []*endpointsChange{{ expectedChanges: []*endpointsChange{{
previous: EndpointsMap{ previous: EndpointsMap{
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")}, svcPortName0: []Endpoint{newTestEp("10.0.1.1:80", "host1"), newTestEp("10.0.1.2:80", "host1")},

View File

@ -298,9 +298,9 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, newEndpointInfo, &isIPv6, recorder, endpointSlicesEnabled, nil),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
iptables: ipt, iptables: ipt,
masqueradeAll: masqueradeAll, masqueradeAll: masqueradeAll,

View File

@ -353,9 +353,9 @@ func NewFakeProxier(ipt utiliptables.Interface, endpointSlicesEnabled bool) *Pro
p := &Proxier{ p := &Proxier{
exec: &fakeexec.FakeExec{}, exec: &fakeexec.FakeExec{},
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled), endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, newEndpointInfo, nil, nil, endpointSlicesEnabled, nil),
iptables: ipt, iptables: ipt,
masqueradeMark: "0x4000", masqueradeMark: "0x4000",
localDetector: detectLocal, localDetector: detectLocal,

View File

@ -441,9 +441,9 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled), endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder, endpointSlicesEnabled, nil),
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
minSyncPeriod: minSyncPeriod, minSyncPeriod: minSyncPeriod,
excludeCIDRs: parseExcludedCIDRs(excludeCIDRs), excludeCIDRs: parseExcludedCIDRs(excludeCIDRs),

View File

@ -122,9 +122,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u
p := &Proxier{ p := &Proxier{
exec: fexec, exec: fexec,
serviceMap: make(proxy.ServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil), serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, nil, nil, nil),
endpointsMap: make(proxy.EndpointsMap), endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled), endpointsChanges: proxy.NewEndpointChangeTracker(testHostname, nil, nil, nil, endpointSlicesEnabled, nil),
excludeCIDRs: excludeCIDRs, excludeCIDRs: excludeCIDRs,
iptables: ipt, iptables: ipt,
ipvs: ipvs, ipvs: ipvs,

View File

@ -198,6 +198,10 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServiceInfo) ServicePort type makeServicePortFunc func(*v1.ServicePort, *v1.Service, *BaseServiceInfo) ServicePort
// This handler is invoked by the apply function on every change. This function should not modify the
// ServiceMap's but just use the changes for any Proxier specific cleanup.
type processServiceMapChangeFunc func(previous, current ServiceMap)
// serviceChange contains all changes to services that happened since proxy rules were synced. For a single object, // serviceChange contains all changes to services that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes, // changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying all of the changes. // current is state after applying all of the changes.
@ -214,19 +218,21 @@ type ServiceChangeTracker struct {
// items maps a service to its serviceChange. // items maps a service to its serviceChange.
items map[types.NamespacedName]*serviceChange items map[types.NamespacedName]*serviceChange
// makeServiceInfo allows proxier to inject customized information when processing service. // makeServiceInfo allows proxier to inject customized information when processing service.
makeServiceInfo makeServicePortFunc makeServiceInfo makeServicePortFunc
processServiceMapChange processServiceMapChangeFunc
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
isIPv6Mode *bool isIPv6Mode *bool
recorder record.EventRecorder recorder record.EventRecorder
} }
// NewServiceChangeTracker initializes a ServiceChangeTracker // NewServiceChangeTracker initializes a ServiceChangeTracker
func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder) *ServiceChangeTracker { func NewServiceChangeTracker(makeServiceInfo makeServicePortFunc, isIPv6Mode *bool, recorder record.EventRecorder, processServiceMapChange processServiceMapChangeFunc) *ServiceChangeTracker {
return &ServiceChangeTracker{ return &ServiceChangeTracker{
items: make(map[types.NamespacedName]*serviceChange), items: make(map[types.NamespacedName]*serviceChange),
makeServiceInfo: makeServiceInfo, makeServiceInfo: makeServiceInfo,
isIPv6Mode: isIPv6Mode, isIPv6Mode: isIPv6Mode,
recorder: recorder, recorder: recorder,
processServiceMapChange: processServiceMapChange,
} }
} }
@ -338,10 +344,14 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
// apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the // apply the changes to ServiceMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the
// udp protocol service cluster ip when service is deleted from the ServiceMap. // udp protocol service cluster ip when service is deleted from the ServiceMap.
// apply triggers processServiceMapChange on every change.
func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) { func (sm *ServiceMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
changes.lock.Lock() changes.lock.Lock()
defer changes.lock.Unlock() defer changes.lock.Unlock()
for _, change := range changes.items { for _, change := range changes.items {
if changes.processServiceMapChange != nil {
changes.processServiceMapChange(change.previous, change.current)
}
sm.merge(change.current) sm.merge(change.current)
// filter out the Update event of current changes from previous changes before calling unmerge() so that can // filter out the Update event of current changes from previous changes before calling unmerge() so that can
// skip deleting the Update events. // skip deleting the Update events.

View File

@ -85,7 +85,7 @@ func makeServicePortName(ns, name, port string, protocol v1.Protocol) ServicePor
} }
func TestServiceToServiceMap(t *testing.T) { func TestServiceToServiceMap(t *testing.T) {
svcTracker := NewServiceChangeTracker(nil, nil, nil) svcTracker := NewServiceChangeTracker(nil, nil, nil, nil)
trueVal := true trueVal := true
falseVal := false falseVal := false
@ -449,9 +449,9 @@ type FakeProxier struct {
func newFakeProxier() *FakeProxier { func newFakeProxier() *FakeProxier {
return &FakeProxier{ return &FakeProxier{
serviceMap: make(ServiceMap), serviceMap: make(ServiceMap),
serviceChanges: NewServiceChangeTracker(nil, nil, nil), serviceChanges: NewServiceChangeTracker(nil, nil, nil, nil),
endpointsMap: make(EndpointsMap), endpointsMap: make(EndpointsMap),
endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false), endpointsChanges: NewEndpointChangeTracker(testHostname, nil, nil, nil, false, nil),
} }
} }

View File

@ -15,19 +15,20 @@ go_library(
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
] + select({ ] + select({
"@io_bazel_rules_go//go/platform:windows": [ "@io_bazel_rules_go//go/platform:windows": [
"//pkg/api/v1/service:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/apis/config:go_default_library", "//pkg/proxy/apis/config:go_default_library",
"//pkg/proxy/config:go_default_library", "//pkg/proxy/config:go_default_library",
"//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/metaproxier:go_default_library", "//pkg/proxy/metaproxier:go_default_library",
"//pkg/proxy/metrics:go_default_library", "//pkg/proxy/metrics:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library", "//pkg/util/async:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
@ -67,11 +68,14 @@ go_test(
"@io_bazel_rules_go//go/platform:windows": [ "@io_bazel_rules_go//go/platform:windows": [
"//pkg/proxy:go_default_library", "//pkg/proxy:go_default_library",
"//pkg/proxy/healthcheck:go_default_library", "//pkg/proxy/healthcheck:go_default_library",
"//pkg/proxy/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1beta1:go_default_library", "//staging/src/k8s.io/api/discovery/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
"//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library", "//vendor/github.com/Microsoft/hcsshim/hcn:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
], ],
"//conditions:default": [], "//conditions:default": [],
}), }),

File diff suppressed because it is too large Load Diff

View File

@ -19,18 +19,20 @@ limitations under the License.
package winkernel package winkernel
import ( import (
"fmt"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1" discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/healthcheck"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
utilpointer "k8s.io/utils/pointer"
"net" "net"
"strings" "strings"
"testing" "testing"
"time" "time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
) )
const testHostName = "test-hostname" const testHostName = "test-hostname"
@ -99,7 +101,7 @@ func (hns fakeHNS) getLoadBalancer(endpoints []endpointsInfo, flags loadBalancer
func (hns fakeHNS) deleteLoadBalancer(hnsID string) error { func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
return nil return nil
} }
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier { func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier {
sourceVip := "192.168.1.2" sourceVip := "192.168.1.2"
hnsNetworkInfo := &hnsNetworkInfo{ hnsNetworkInfo := &hnsNetworkInfo{
id: strings.ToUpper(guid), id: strings.ToUpper(guid),
@ -107,11 +109,9 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
networkType: networkType, networkType: networkType,
} }
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[localPort]closeable), portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxyServiceMap), serviceMap: make(proxy.ServiceMap),
serviceChanges: newServiceChangeMap(), endpointsMap: make(proxy.EndpointsMap),
endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname),
clusterCIDR: clusterCIDR, clusterCIDR: clusterCIDR,
hostname: testHostName, hostname: testHostName,
nodeIP: nodeIP, nodeIP: nodeIP,
@ -123,12 +123,19 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
hns: newFakeHNS(), hns: newFakeHNS(),
endPointsRefCount: make(endPointsReferenceCountMap), endPointsRefCount: make(endPointsReferenceCountMap),
} }
isIPv6 := false
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, &isIPv6, nil, proxier.serviceMapChange)
endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, &isIPv6, nil, endpointSliceEnabled, proxier.endpointsMapChange)
proxier.endpointsChanges = endpointChangeTracker
proxier.serviceChanges = serviceChanges
return proxier return proxier
} }
func TestCreateServiceVip(t *testing.T) { func TestCreateServiceVip(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -140,6 +147,7 @@ func TestCreateServiceVip(t *testing.T) {
svcPortName := proxy.ServicePortName{ svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"), NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80", Port: "p80",
Protocol: v1.ProtocolTCP,
} }
timeoutSeconds := v1.DefaultClientIPServiceAffinitySeconds timeoutSeconds := v1.DefaultClientIPServiceAffinitySeconds
@ -163,19 +171,26 @@ func TestCreateServiceVip(t *testing.T) {
}), }),
) )
makeEndpointsMap(proxier) makeEndpointsMap(proxier)
proxier.setInitialized(true)
proxier.syncProxyRules() proxier.syncProxyRules()
if proxier.serviceMap[svcPortName].remoteEndpoint == nil { svc := proxier.serviceMap[svcPortName]
t.Error() svcInfo, ok := svc.(*serviceInfo)
} if !ok {
if proxier.serviceMap[svcPortName].remoteEndpoint.ip != svcIP { t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
t.Error()
} else {
if svcInfo.remoteEndpoint == nil {
t.Error()
}
if svcInfo.remoteEndpoint.ip != svcIP {
t.Error()
}
} }
} }
func TestCreateRemoteEndpointOverlay(t *testing.T) { func TestCreateRemoteEndpointOverlay(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -186,6 +201,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
svcPortName := proxy.ServicePortName{ svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"), NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80", Port: "p80",
Protocol: v1.ProtocolTCP,
} }
makeServiceMap(proxier, makeServiceMap(proxier,
@ -207,30 +223,38 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
IP: epIpAddressRemote, IP: epIpAddressRemote,
}}, }},
Ports: []v1.EndpointPort{{ Ports: []v1.EndpointPort{{
Name: svcPortName.Port, Name: svcPortName.Port,
Port: int32(svcPort), Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}}, }},
}} }}
}), }),
) )
proxier.setInitialized(true)
proxier.syncProxyRules() proxier.syncProxyRules()
if proxier.endpointsMap[svcPortName][0].hnsID != guid { ep := proxier.endpointsMap[svcPortName][0]
t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) epInfo, ok := ep.(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
} else {
if epInfo.hnsID != guid {
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
}
} }
if *proxier.endPointsRefCount[guid] <= 0 { if *proxier.endPointsRefCount[guid] <= 0 {
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
} }
if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount { if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount) t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
} }
} }
func TestCreateRemoteEndpointL2Bridge(t *testing.T) { func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge") proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -241,6 +265,7 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
svcPortName := proxy.ServicePortName{ svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"), NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80", Port: "p80",
Protocol: v1.ProtocolTCP,
} }
makeServiceMap(proxier, makeServiceMap(proxier,
@ -262,31 +287,38 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
IP: epIpAddressRemote, IP: epIpAddressRemote,
}}, }},
Ports: []v1.EndpointPort{{ Ports: []v1.EndpointPort{{
Name: svcPortName.Port, Name: svcPortName.Port,
Port: int32(svcPort), Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}}, }},
}} }}
}), }),
) )
proxier.setInitialized(true)
proxier.syncProxyRules() proxier.syncProxyRules()
ep := proxier.endpointsMap[svcPortName][0]
epInfo, ok := ep.(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
if proxier.endpointsMap[svcPortName][0].hnsID != guid { } else {
t.Errorf("%v does not match %v", proxier.endpointsMap[svcPortName][0].hnsID, guid) if epInfo.hnsID != guid {
t.Errorf("%v does not match %v", epInfo.hnsID, guid)
}
} }
if *proxier.endPointsRefCount[guid] <= 0 { if *proxier.endPointsRefCount[guid] <= 0 {
t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid]) t.Errorf("RefCount not incremented. Current value: %v", *proxier.endPointsRefCount[guid])
} }
if *proxier.endPointsRefCount[guid] != *proxier.endpointsMap[svcPortName][0].refCount { if *proxier.endPointsRefCount[guid] != *epInfo.refCount {
t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *proxier.endpointsMap[svcPortName][0].refCount) t.Errorf("Global refCount: %v does not match endpoint refCount: %v", *proxier.endPointsRefCount[guid], *epInfo.refCount)
} }
} }
func TestCreateLoadBalancer(t *testing.T) { func TestCreateLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay") proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", false)
if proxier == nil { if proxier == nil {
t.Error() t.Error()
} }
@ -297,6 +329,7 @@ func TestCreateLoadBalancer(t *testing.T) {
svcPortName := proxy.ServicePortName{ svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"), NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80", Port: "p80",
Protocol: v1.ProtocolTCP,
} }
makeServiceMap(proxier, makeServiceMap(proxier,
@ -318,20 +351,101 @@ func TestCreateLoadBalancer(t *testing.T) {
IP: epIpAddressRemote, IP: epIpAddressRemote,
}}, }},
Ports: []v1.EndpointPort{{ Ports: []v1.EndpointPort{{
Name: svcPortName.Port, Name: svcPortName.Port,
Port: int32(svcPort), Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}}, }},
}} }}
}), }),
) )
proxier.setInitialized(true)
proxier.syncProxyRules() proxier.syncProxyRules()
if proxier.serviceMap[svcPortName].hnsID != guid { svc := proxier.serviceMap[svcPortName]
t.Errorf("%v does not match %v", proxier.serviceMap[svcPortName].hnsID, guid) svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
} else {
if svcInfo.hnsID != guid {
t.Errorf("%v does not match %v", svcInfo.hnsID, guid)
}
}
}
func TestEndpointSlice(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "Overlay", true)
if proxier == nil {
t.Error()
}
proxier.servicesSynced = true
proxier.endpointSlicesSynced = true
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
}
proxier.OnServiceAdd(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: svcPortName.Name, Namespace: svcPortName.Namespace},
Spec: v1.ServiceSpec{
ClusterIP: "172.20.1.1",
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Name: svcPortName.Port, TargetPort: intstr.FromInt(80), Protocol: v1.ProtocolTCP}},
},
})
// Add initial endpoint slice
tcpProtocol := v1.ProtocolTCP
endpointSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-1", svcPortName.Name),
Namespace: svcPortName.Namespace,
Labels: map[string]string{discovery.LabelServiceName: svcPortName.Name},
},
Ports: []discovery.EndpointPort{{
Name: &svcPortName.Port,
Port: utilpointer.Int32Ptr(80),
Protocol: &tcpProtocol,
}},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"192.168.2.3"},
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
Topology: map[string]string{"kubernetes.io/hostname": "testhost2"},
}},
}
proxier.OnEndpointSliceAdd(endpointSlice)
proxier.setInitialized(true)
proxier.syncProxyRules()
svc := proxier.serviceMap[svcPortName]
svcInfo, ok := svc.(*serviceInfo)
if !ok {
t.Errorf("Failed to cast serviceInfo %q", svcPortName.String())
} else {
if svcInfo.hnsID != guid {
t.Errorf("The Hns Loadbalancer Id %v does not match %v. ServicePortName %q", svcInfo.hnsID, guid, svcPortName.String())
}
}
ep := proxier.endpointsMap[svcPortName][0]
epInfo, ok := ep.(*endpointsInfo)
if !ok {
t.Errorf("Failed to cast endpointsInfo %q", svcPortName.String())
} else {
if epInfo.hnsID != guid {
t.Errorf("Hns EndpointId %v does not match %v. ServicePortName %q", epInfo.hnsID, guid, svcPortName.String())
}
} }
} }
func TestNoopEndpointSlice(t *testing.T) { func TestNoopEndpointSlice(t *testing.T) {
p := Proxier{} p := Proxier{}
p.OnEndpointSliceAdd(&discovery.EndpointSlice{}) p.OnEndpointSliceAdd(&discovery.EndpointSlice{})