Ipvs: Add a new FlagSourceHash to "mh" distribution method.

With the flag, ipvs uses both source IP and source port (instead of
only source IP) to distribute new connections evently to endpoints
that avoids sending all connections from the same client (i.e. same
source IP) to one single endpoint.

User can explicitly set sessionAffinity in service spec to keep all
connections from a source IP to end up on the same endpoint if needed.

Change-Id: I42f950c0840ac06a4ee68a7bbdeab0fc5505c71f
This commit is contained in:
Son Dinh 2023-01-31 21:55:45 +11:00 committed by Son Dinh
parent cf14b50b0d
commit 4f75949bcb
4 changed files with 46 additions and 4 deletions

View File

@ -1180,6 +1180,10 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// Set the source hash flag needed for the distribution method "mh"
if proxier.ipvsScheduler == "mh" {
serv.Flags |= utilipvs.FlagSourceHash
}
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
@ -1233,6 +1237,10 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// Set the source hash flag needed for the distribution method "mh"
if proxier.ipvsScheduler == "mh" {
serv.Flags |= utilipvs.FlagSourceHash
}
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
@ -1333,6 +1341,10 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// Set the source hash flag needed for the distribution method "mh"
if proxier.ipvsScheduler == "mh" {
serv.Flags |= utilipvs.FlagSourceHash
}
if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true
activeBindAddrs[serv.Address.String()] = true
@ -1476,6 +1488,10 @@ func (proxier *Proxier) syncProxyRules() {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds())
}
// Set the source hash flag needed for the distribution method "mh"
if proxier.ipvsScheduler == "mh" {
serv.Flags |= utilipvs.FlagSourceHash
}
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil {
activeIPVSServices[serv.String()] = true

View File

@ -67,6 +67,8 @@ const (
FlagPersistent = 0x1
// FlagHashed specify IPVS service hash flag
FlagHashed = 0x2
// FlagSourceHash enables IPVS service hashing on source port and source IP
FlagSourceHash = 0x10
)
// Equal check the equality of virtual server.

View File

@ -225,9 +225,9 @@ func toVirtualServer(svc *libipvs.Service) (*VirtualServer, error) {
Timeout: svc.Timeout,
}
// Test Flags >= 0x2, valid Flags ranges [0x2, 0x3]
// Test FlagHashed (0x2). A valid flag must include FlagHashed
if svc.Flags&FlagHashed == 0 {
return nil, fmt.Errorf("Flags of successfully created IPVS service should be >= %d since every service is hashed into the service table", FlagHashed)
return nil, fmt.Errorf("Flags of successfully created IPVS service should enable the flag (%x) since every service is hashed into the service table", FlagHashed)
}
// Sub Flags to 0x2
// 011 -> 001, 010 -> 000

View File

@ -44,7 +44,7 @@ func Test_toVirtualServer(t *testing.T) {
},
VirtualServer{},
true,
fmt.Sprintf("IPVS Service Flags should be >= %d, got 0x0", FlagHashed),
fmt.Sprintf("IPVS Service Flags should include %x, got 0x0", FlagHashed),
},
{
libipvs.Service{
@ -52,7 +52,7 @@ func Test_toVirtualServer(t *testing.T) {
},
VirtualServer{},
true,
fmt.Sprintf("IPVS Service Flags should be >= %d, got 0x1", FlagHashed),
fmt.Sprintf("IPVS Service Flags should include %x, got 0x1", FlagHashed),
},
{
libipvs.Service{
@ -150,6 +150,30 @@ func Test_toVirtualServer(t *testing.T) {
false,
"",
},
{
libipvs.Service{
Protocol: 0,
Port: 0,
FWMark: 0,
SchedName: "mh",
Flags: uint32(FlagPersistent + FlagHashed + FlagSourceHash),
Timeout: 0,
Netmask: 0xffffffff,
AddressFamily: unix.AF_INET,
Address: netutils.ParseIPSloppy("1.2.3.4"),
PEName: "",
},
VirtualServer{
Address: netutils.ParseIPSloppy("1.2.3.4"),
Protocol: "",
Port: 0,
Scheduler: "mh",
Flags: ServiceFlags(FlagPersistent + FlagSourceHash),
Timeout: 0,
},
false,
"",
},
{
libipvs.Service{
Protocol: unix.IPPROTO_SCTP,