From 4f75949bcbd17ea6e61fb1a652090aaad11fd982 Mon Sep 17 00:00:00 2001 From: Son Dinh Date: Tue, 31 Jan 2023 21:55:45 +1100 Subject: [PATCH] Ipvs: Add a new FlagSourceHash to "mh" distribution method. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit With the flag, ipvs uses both source IP and source port (instead of only source IP) to distribute new connections evently to endpoints that avoids sending all connections from the same client (i.e. same source IP) to one single endpoint. User can explicitly set sessionAffinity in service spec to keep all connections from a source IP to end up on the same endpoint if needed. Change-Id: I42f950c0840ac06a4ee68a7bbdeab0fc5505c71f --- pkg/proxy/ipvs/proxier.go | 16 ++++++++++++++++ pkg/util/ipvs/ipvs.go | 2 ++ pkg/util/ipvs/ipvs_linux.go | 4 ++-- pkg/util/ipvs/ipvs_linux_test.go | 28 ++++++++++++++++++++++++++-- 4 files changed, 46 insertions(+), 4 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 21b4e550422..48c8c391b87 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1180,6 +1180,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true @@ -1233,6 +1237,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true @@ -1333,6 +1341,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true @@ -1476,6 +1488,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true diff --git a/pkg/util/ipvs/ipvs.go b/pkg/util/ipvs/ipvs.go index 2bb08e430a5..c9b2ff4b332 100644 --- a/pkg/util/ipvs/ipvs.go +++ b/pkg/util/ipvs/ipvs.go @@ -67,6 +67,8 @@ const ( FlagPersistent = 0x1 // FlagHashed specify IPVS service hash flag FlagHashed = 0x2 + // FlagSourceHash enables IPVS service hashing on source port and source IP + FlagSourceHash = 0x10 ) // Equal check the equality of virtual server. diff --git a/pkg/util/ipvs/ipvs_linux.go b/pkg/util/ipvs/ipvs_linux.go index 707cbfa7b41..d6b94741521 100644 --- a/pkg/util/ipvs/ipvs_linux.go +++ b/pkg/util/ipvs/ipvs_linux.go @@ -225,9 +225,9 @@ func toVirtualServer(svc *libipvs.Service) (*VirtualServer, error) { Timeout: svc.Timeout, } - // Test Flags >= 0x2, valid Flags ranges [0x2, 0x3] + // Test FlagHashed (0x2). A valid flag must include FlagHashed if svc.Flags&FlagHashed == 0 { - return nil, fmt.Errorf("Flags of successfully created IPVS service should be >= %d since every service is hashed into the service table", FlagHashed) + return nil, fmt.Errorf("Flags of successfully created IPVS service should enable the flag (%x) since every service is hashed into the service table", FlagHashed) } // Sub Flags to 0x2 // 011 -> 001, 010 -> 000 diff --git a/pkg/util/ipvs/ipvs_linux_test.go b/pkg/util/ipvs/ipvs_linux_test.go index c3a68741d2e..e604cefbea6 100644 --- a/pkg/util/ipvs/ipvs_linux_test.go +++ b/pkg/util/ipvs/ipvs_linux_test.go @@ -44,7 +44,7 @@ func Test_toVirtualServer(t *testing.T) { }, VirtualServer{}, true, - fmt.Sprintf("IPVS Service Flags should be >= %d, got 0x0", FlagHashed), + fmt.Sprintf("IPVS Service Flags should include %x, got 0x0", FlagHashed), }, { libipvs.Service{ @@ -52,7 +52,7 @@ func Test_toVirtualServer(t *testing.T) { }, VirtualServer{}, true, - fmt.Sprintf("IPVS Service Flags should be >= %d, got 0x1", FlagHashed), + fmt.Sprintf("IPVS Service Flags should include %x, got 0x1", FlagHashed), }, { libipvs.Service{ @@ -150,6 +150,30 @@ func Test_toVirtualServer(t *testing.T) { false, "", }, + { + libipvs.Service{ + Protocol: 0, + Port: 0, + FWMark: 0, + SchedName: "mh", + Flags: uint32(FlagPersistent + FlagHashed + FlagSourceHash), + Timeout: 0, + Netmask: 0xffffffff, + AddressFamily: unix.AF_INET, + Address: netutils.ParseIPSloppy("1.2.3.4"), + PEName: "", + }, + VirtualServer{ + Address: netutils.ParseIPSloppy("1.2.3.4"), + Protocol: "", + Port: 0, + Scheduler: "mh", + Flags: ServiceFlags(FlagPersistent + FlagSourceHash), + Timeout: 0, + }, + false, + "", + }, { libipvs.Service{ Protocol: unix.IPPROTO_SCTP,