Graduate EndpointSliceProxying and WindowsEndpointSliceProxying Gates

This commit is contained in:
Swetha Repakula
2021-07-02 15:49:19 -07:00
parent d7123a6524
commit 0a42f7b989
13 changed files with 1498 additions and 1850 deletions

View File

@@ -111,7 +111,7 @@ func (hns fakeHNS) deleteLoadBalancer(hnsID string) error {
return nil
}
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string, endpointSliceEnabled bool) *Proxier {
func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clusterCIDR string, hostname string, nodeIP net.IP, networkType string) *Proxier {
sourceVip := "192.168.1.2"
hnsNetworkInfo := &hnsNetworkInfo{
id: strings.ToUpper(guid),
@@ -134,7 +134,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
}
serviceChanges := proxy.NewServiceChangeTracker(proxier.newServiceInfo, v1.IPv4Protocol, nil, proxier.serviceMapChange)
endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, endpointSliceEnabled, proxier.endpointsMapChange)
endpointChangeTracker := proxy.NewEndpointChangeTracker(hostname, proxier.newEndpointInfo, v1.IPv4Protocol, nil, proxier.endpointsMapChange)
proxier.endpointsChanges = endpointChangeTracker
proxier.serviceChanges = serviceChanges
@@ -143,7 +143,7 @@ func NewFakeProxier(syncPeriod time.Duration, minSyncPeriod time.Duration, clust
func TestCreateServiceVip(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false)
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil {
t.Error()
}
@@ -178,7 +178,6 @@ func TestCreateServiceVip(t *testing.T) {
}}
}),
)
makeEndpointsMap(proxier)
proxier.setInitialized(true)
proxier.syncProxyRules()
@@ -199,7 +198,7 @@ func TestCreateServiceVip(t *testing.T) {
func TestCreateRemoteEndpointOverlay(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false)
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil {
t.Error()
}
@@ -212,6 +211,7 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
Port: "p80",
Protocol: v1.ProtocolTCP,
}
tcpProtocol := v1.ProtocolTCP
makeServiceMap(proxier,
makeTestService(svcPortName.Namespace, svcPortName.Name, func(svc *v1.Service) {
@@ -225,17 +225,16 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}},
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &tcpProtocol,
}}
}),
)
@@ -264,18 +263,19 @@ func TestCreateRemoteEndpointOverlay(t *testing.T) {
func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge")
if proxier == nil {
t.Error()
}
tcpProtocol := v1.ProtocolTCP
svcIP := "10.20.30.41"
svcPort := 80
svcNodePort := 3001
svcPortName := proxy.ServicePortName{
NamespacedName: makeNSN("ns1", "svc1"),
Port: "p80",
Protocol: v1.ProtocolTCP,
Protocol: tcpProtocol,
}
makeServiceMap(proxier,
@@ -285,22 +285,21 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
svc.Spec.Ports = []v1.ServicePort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
Protocol: tcpProtocol,
NodePort: int32(svcNodePort),
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}},
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.String(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &tcpProtocol,
}}
}),
)
@@ -327,7 +326,8 @@ func TestCreateRemoteEndpointL2Bridge(t *testing.T) {
}
func TestSharedRemoteEndpointDelete(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
tcpProtocol := v1.ProtocolTCP
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge")
if proxier == nil {
t.Error()
}
@@ -372,29 +372,27 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
}},
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName1.Port),
Port: utilpointer.Int32(int32(svcPort1)),
Protocol: &tcpProtocol,
}}
}),
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
}},
makeTestEndpointSlice(svcPortName2.Namespace, svcPortName2.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName2.Port),
Port: utilpointer.Int32(int32(svcPort2)),
Protocol: &tcpProtocol,
}}
}),
)
@@ -433,17 +431,16 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
}),
)
deleteEndpoints(proxier,
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
}},
deleteEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName2.Namespace, svcPortName2.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName2.Port),
Port: utilpointer.Int32(int32(svcPort2)),
Protocol: &tcpProtocol,
}}
}),
)
@@ -472,7 +469,7 @@ func TestSharedRemoteEndpointDelete(t *testing.T) {
}
func TestSharedRemoteEndpointUpdate(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge", false)
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), "L2Bridge")
if proxier == nil {
t.Error()
}
@@ -518,29 +515,28 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
}},
tcpProtocol := v1.ProtocolTCP
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName1.Port),
Port: utilpointer.Int32(int32(svcPort1)),
Protocol: &tcpProtocol,
}}
}),
makeTestEndpoints(svcPortName2.Namespace, svcPortName2.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName2.Port,
Port: int32(svcPort2),
Protocol: v1.ProtocolTCP,
}},
makeTestEndpointSlice(svcPortName2.Namespace, svcPortName2.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName2.Port),
Port: utilpointer.Int32(int32(svcPort2)),
Protocol: &tcpProtocol,
}}
}),
)
@@ -589,40 +585,37 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
}}
}))
proxier.OnEndpointsUpdate(
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
}},
proxier.OnEndpointSliceUpdate(
makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName1.Port),
Port: utilpointer.Int32(int32(svcPort1)),
Protocol: &tcpProtocol,
}}
}),
makeTestEndpoints(svcPortName1.Namespace, svcPortName1.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{
{
Name: svcPortName1.Port,
Port: int32(svcPort1),
Protocol: v1.ProtocolTCP,
},
{
Name: "p443",
Port: int32(443),
Protocol: v1.ProtocolTCP,
}},
makeTestEndpointSlice(svcPortName1.Namespace, svcPortName1.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName1.Port),
Port: utilpointer.Int32(int32(svcPort1)),
Protocol: &tcpProtocol,
},
{
Name: utilpointer.StringPtr("p443"),
Port: utilpointer.Int32(int32(443)),
Protocol: &tcpProtocol,
}}
}))
proxier.mu.Lock()
proxier.endpointsSynced = true
proxier.endpointSlicesSynced = true
proxier.mu.Unlock()
proxier.setInitialized(true)
@@ -650,7 +643,8 @@ func TestSharedRemoteEndpointUpdate(t *testing.T) {
}
func TestCreateLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false)
tcpProtocol := v1.ProtocolTCP
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil {
t.Error()
}
@@ -676,17 +670,16 @@ func TestCreateLoadBalancer(t *testing.T) {
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}},
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &tcpProtocol,
}}
}),
)
@@ -709,7 +702,7 @@ func TestCreateLoadBalancer(t *testing.T) {
func TestCreateDsrLoadBalancer(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, false)
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil {
t.Error()
}
@@ -736,17 +729,17 @@ func TestCreateDsrLoadBalancer(t *testing.T) {
}}
}),
)
makeEndpointsMap(proxier,
makeTestEndpoints(svcPortName.Namespace, svcPortName.Name, func(ept *v1.Endpoints) {
ept.Subsets = []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{
IP: epIpAddressRemote,
}},
Ports: []v1.EndpointPort{{
Name: svcPortName.Port,
Port: int32(svcPort),
Protocol: v1.ProtocolTCP,
}},
tcpProtocol := v1.ProtocolTCP
populateEndpointSlices(proxier,
makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, 1, func(eps *discovery.EndpointSlice) {
eps.AddressType = discovery.AddressTypeIPv4
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIpAddressRemote},
}}
eps.Ports = []discovery.EndpointPort{{
Name: utilpointer.StringPtr(svcPortName.Port),
Port: utilpointer.Int32(int32(svcPort)),
Protocol: &tcpProtocol,
}}
}),
)
@@ -771,7 +764,7 @@ func TestCreateDsrLoadBalancer(t *testing.T) {
func TestEndpointSlice(t *testing.T) {
syncPeriod := 30 * time.Second
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY, true)
proxier := NewFakeProxier(syncPeriod, syncPeriod, clusterCIDR, "testhost", net.ParseIP("10.0.0.1"), NETWORK_TYPE_OVERLAY)
if proxier == nil {
t.Error()
}
@@ -908,33 +901,30 @@ func makeTestService(namespace, name string, svcFunc func(*v1.Service)) *v1.Serv
return svc
}
func makeEndpointsMap(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
for i := range allEndpoints {
proxier.OnEndpointsAdd(allEndpoints[i])
func deleteEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) {
for i := range allEndpointSlices {
proxier.OnEndpointSliceDelete(allEndpointSlices[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
proxier.endpointSlicesSynced = true
}
func deleteEndpoints(proxier *Proxier, allEndpoints ...*v1.Endpoints) {
for i := range allEndpoints {
proxier.OnEndpointsDelete(allEndpoints[i])
func populateEndpointSlices(proxier *Proxier, allEndpointSlices ...*discovery.EndpointSlice) {
for i := range allEndpointSlices {
proxier.OnEndpointSliceAdd(allEndpointSlices[i])
}
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.endpointsSynced = true
}
func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
ept := &v1.Endpoints{
func makeTestEndpointSlice(namespace, name string, sliceNum int, epsFunc func(*discovery.EndpointSlice)) *discovery.EndpointSlice {
eps := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: fmt.Sprintf("%s-%d", name, sliceNum),
Namespace: namespace,
Labels: map[string]string{discovery.LabelServiceName: name},
},
}
eptFunc(ept)
return ept
epsFunc(eps)
return eps
}