diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 21b4e550422..48c8c391b87 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1180,6 +1180,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true @@ -1233,6 +1237,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true @@ -1333,6 +1341,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } if err := proxier.syncService(svcPortNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true @@ -1476,6 +1488,10 @@ func (proxier *Proxier) syncProxyRules() { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } + // Set the source hash flag needed for the distribution method "mh" + if proxier.ipvsScheduler == "mh" { + serv.Flags |= utilipvs.FlagSourceHash + } // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. if err := proxier.syncService(svcPortNameString, serv, false, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true diff --git a/pkg/util/ipvs/ipvs.go b/pkg/util/ipvs/ipvs.go index 2bb08e430a5..c9b2ff4b332 100644 --- a/pkg/util/ipvs/ipvs.go +++ b/pkg/util/ipvs/ipvs.go @@ -67,6 +67,8 @@ const ( FlagPersistent = 0x1 // FlagHashed specify IPVS service hash flag FlagHashed = 0x2 + // FlagSourceHash enables IPVS service hashing on source port and source IP + FlagSourceHash = 0x10 ) // Equal check the equality of virtual server. diff --git a/pkg/util/ipvs/ipvs_linux.go b/pkg/util/ipvs/ipvs_linux.go index 707cbfa7b41..d6b94741521 100644 --- a/pkg/util/ipvs/ipvs_linux.go +++ b/pkg/util/ipvs/ipvs_linux.go @@ -225,9 +225,9 @@ func toVirtualServer(svc *libipvs.Service) (*VirtualServer, error) { Timeout: svc.Timeout, } - // Test Flags >= 0x2, valid Flags ranges [0x2, 0x3] + // Test FlagHashed (0x2). A valid flag must include FlagHashed if svc.Flags&FlagHashed == 0 { - return nil, fmt.Errorf("Flags of successfully created IPVS service should be >= %d since every service is hashed into the service table", FlagHashed) + return nil, fmt.Errorf("Flags of successfully created IPVS service should enable the flag (%x) since every service is hashed into the service table", FlagHashed) } // Sub Flags to 0x2 // 011 -> 001, 010 -> 000 diff --git a/pkg/util/ipvs/ipvs_linux_test.go b/pkg/util/ipvs/ipvs_linux_test.go index c3a68741d2e..e604cefbea6 100644 --- a/pkg/util/ipvs/ipvs_linux_test.go +++ b/pkg/util/ipvs/ipvs_linux_test.go @@ -44,7 +44,7 @@ func Test_toVirtualServer(t *testing.T) { }, VirtualServer{}, true, - fmt.Sprintf("IPVS Service Flags should be >= %d, got 0x0", FlagHashed), + fmt.Sprintf("IPVS Service Flags should include %x, got 0x0", FlagHashed), }, { libipvs.Service{ @@ -52,7 +52,7 @@ func Test_toVirtualServer(t *testing.T) { }, VirtualServer{}, true, - fmt.Sprintf("IPVS Service Flags should be >= %d, got 0x1", FlagHashed), + fmt.Sprintf("IPVS Service Flags should include %x, got 0x1", FlagHashed), }, { libipvs.Service{ @@ -150,6 +150,30 @@ func Test_toVirtualServer(t *testing.T) { false, "", }, + { + libipvs.Service{ + Protocol: 0, + Port: 0, + FWMark: 0, + SchedName: "mh", + Flags: uint32(FlagPersistent + FlagHashed + FlagSourceHash), + Timeout: 0, + Netmask: 0xffffffff, + AddressFamily: unix.AF_INET, + Address: netutils.ParseIPSloppy("1.2.3.4"), + PEName: "", + }, + VirtualServer{ + Address: netutils.ParseIPSloppy("1.2.3.4"), + Protocol: "", + Port: 0, + Scheduler: "mh", + Flags: ServiceFlags(FlagPersistent + FlagSourceHash), + Timeout: 0, + }, + false, + "", + }, { libipvs.Service{ Protocol: unix.IPPROTO_SCTP,