mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
Merge pull request #115019 from Nordix/nodeip-as-lbip
Allow node addresses to be used as loadBalancer addresses for proxy-mode=ipvs
This commit is contained in:
commit
51edcd963e
@ -40,4 +40,9 @@ type NetLinkHandle interface {
|
|||||||
// Only the addresses of the current family are returned.
|
// Only the addresses of the current family are returned.
|
||||||
// IPv6 link-local and loopback addresses are excluded.
|
// IPv6 link-local and loopback addresses are excluded.
|
||||||
GetLocalAddresses(dev string) (sets.Set[string], error)
|
GetLocalAddresses(dev string) (sets.Set[string], error)
|
||||||
|
// GetAllLocalAddressesExcept return all local addresses on the node, except from the passed dev.
|
||||||
|
// This is not the same as to take the diff between GetAllLocalAddresses and GetLocalAddresses
|
||||||
|
// since an address can be assigned to many interfaces. This problem raised
|
||||||
|
// https://github.com/kubernetes/kubernetes/issues/114815
|
||||||
|
GetAllLocalAddressesExcept(dev string) (sets.Set[string], error)
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
|
||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
|
|
||||||
@ -164,3 +165,30 @@ func (h *netlinkHandle) isValidForSet(ip net.IP) bool {
|
|||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAllLocalAddressesExcept return all local addresses on the node,
|
||||||
|
// except from the passed dev. This is not the same as to take the
|
||||||
|
// diff between GetAllLocalAddresses and GetLocalAddresses since an
|
||||||
|
// address can be assigned to many interfaces. This problem raised
|
||||||
|
// https://github.com/kubernetes/kubernetes/issues/114815
|
||||||
|
func (h *netlinkHandle) GetAllLocalAddressesExcept(dev string) (sets.Set[string], error) {
|
||||||
|
ifaces, err := net.Interfaces()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var addr []net.Addr
|
||||||
|
for _, iface := range ifaces {
|
||||||
|
if iface.Name == dev {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ifadr, err := iface.Addrs()
|
||||||
|
if err != nil {
|
||||||
|
// This may happen if the interface was deleted. Ignore
|
||||||
|
// but log the error.
|
||||||
|
klog.ErrorS(err, "Reading addresses", "interface", iface.Name)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
addr = append(addr, ifadr...)
|
||||||
|
}
|
||||||
|
return utilproxy.AddressSet(h.isValidForSet, addr), nil
|
||||||
|
}
|
||||||
|
@ -71,6 +71,11 @@ func (h *netlinkHandle) GetLocalAddresses(dev string) (sets.Set[string], error)
|
|||||||
return nil, fmt.Errorf("netlink is not supported in this platform")
|
return nil, fmt.Errorf("netlink is not supported in this platform")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetAllLocalAddressesExcept is part of interface.
|
||||||
|
func (h *netlinkHandle) GetAllLocalAddressesExcept(dev string) (sets.Set[string], error) {
|
||||||
|
return nil, fmt.Errorf("netlink is not supported in this platform")
|
||||||
|
}
|
||||||
|
|
||||||
// Must match the one in proxier_test.go
|
// Must match the one in proxier_test.go
|
||||||
func (h *netlinkHandle) isValidForSet(ip net.IP) bool {
|
func (h *netlinkHandle) isValidForSet(ip net.IP) bool {
|
||||||
return false
|
return false
|
||||||
|
@ -990,11 +990,10 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
klog.ErrorS(err, "Error listing addresses binded to dummy interface")
|
klog.ErrorS(err, "Error listing addresses binded to dummy interface")
|
||||||
}
|
}
|
||||||
// nodeAddressSet All addresses *except* those on the dummy interface
|
// nodeAddressSet All addresses *except* those on the dummy interface
|
||||||
nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddresses()
|
nodeAddressSet, err := proxier.netlinkHandle.GetAllLocalAddressesExcept(defaultDummyDevice)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.ErrorS(err, "Error listing node addresses")
|
klog.ErrorS(err, "Error listing node addresses")
|
||||||
}
|
}
|
||||||
nodeAddressSet = nodeAddressSet.Difference(alreadyBoundAddrs)
|
|
||||||
|
|
||||||
hasNodePort := false
|
hasNodePort := false
|
||||||
for _, svc := range proxier.svcPortMap {
|
for _, svc := range proxier.svcPortMap {
|
||||||
@ -1170,9 +1169,13 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
if proxier.ipvsScheduler == "mh" {
|
if proxier.ipvsScheduler == "mh" {
|
||||||
serv.Flags |= utilipvs.FlagSourceHash
|
serv.Flags |= utilipvs.FlagSourceHash
|
||||||
}
|
}
|
||||||
if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
|
// We must not add the address to the dummy device if it exist on another interface
|
||||||
|
shouldBind := !nodeAddressSet.Has(serv.Address.String())
|
||||||
|
if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
|
||||||
activeIPVSServices.Insert(serv.String())
|
activeIPVSServices.Insert(serv.String())
|
||||||
|
if shouldBind {
|
||||||
activeBindAddrs.Insert(serv.Address.String())
|
activeBindAddrs.Insert(serv.Address.String())
|
||||||
|
}
|
||||||
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
|
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
|
||||||
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
|
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
|
||||||
}
|
}
|
||||||
@ -1273,9 +1276,13 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
if proxier.ipvsScheduler == "mh" {
|
if proxier.ipvsScheduler == "mh" {
|
||||||
serv.Flags |= utilipvs.FlagSourceHash
|
serv.Flags |= utilipvs.FlagSourceHash
|
||||||
}
|
}
|
||||||
if err := proxier.syncService(svcPortNameString, serv, true, alreadyBoundAddrs); err == nil {
|
// We must not add the address to the dummy device if it exist on another interface
|
||||||
|
shouldBind := !nodeAddressSet.Has(serv.Address.String())
|
||||||
|
if err := proxier.syncService(svcPortNameString, serv, shouldBind, alreadyBoundAddrs); err == nil {
|
||||||
activeIPVSServices.Insert(serv.String())
|
activeIPVSServices.Insert(serv.String())
|
||||||
|
if shouldBind {
|
||||||
activeBindAddrs.Insert(serv.Address.String())
|
activeBindAddrs.Insert(serv.Address.String())
|
||||||
|
}
|
||||||
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
|
if err := proxier.syncEndpoint(svcPortName, svcInfo.ExternalPolicyLocal(), serv); err != nil {
|
||||||
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
|
klog.ErrorS(err, "Failed to sync endpoint for service", "servicePortName", svcPortName, "virtualServer", serv)
|
||||||
}
|
}
|
||||||
|
@ -140,6 +140,21 @@ func (h *FakeNetlinkHandle) GetAllLocalAddresses() (sets.Set[string], error) {
|
|||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *FakeNetlinkHandle) GetAllLocalAddressesExcept(dev string) (sets.Set[string], error) {
|
||||||
|
res := sets.New[string]()
|
||||||
|
for linkName := range h.localAddresses {
|
||||||
|
if linkName == dev {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, addr := range h.localAddresses[linkName] {
|
||||||
|
if h.isValidForSet(addr) {
|
||||||
|
res.Insert(addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SetLocalAddresses set IP addresses to the given interface device. It's not part of interface.
|
// SetLocalAddresses set IP addresses to the given interface device. It's not part of interface.
|
||||||
func (h *FakeNetlinkHandle) SetLocalAddresses(dev string, ips ...string) error {
|
func (h *FakeNetlinkHandle) SetLocalAddresses(dev string, ips ...string) error {
|
||||||
if h.localAddresses == nil {
|
if h.localAddresses == nil {
|
||||||
|
@ -23,6 +23,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/proxy/ipvs"
|
"k8s.io/kubernetes/pkg/proxy/ipvs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// (I am unsure if this test has any value since it only tests the fake implementation)
|
||||||
func TestSetGetLocalAddresses(t *testing.T) {
|
func TestSetGetLocalAddresses(t *testing.T) {
|
||||||
fake := NewFakeNetlinkHandle(false)
|
fake := NewFakeNetlinkHandle(false)
|
||||||
_ = ipvs.NetLinkHandle(fake) // Ensure that the interface is honored
|
_ = ipvs.NetLinkHandle(fake) // Ensure that the interface is honored
|
||||||
@ -43,10 +44,15 @@ func TestSetGetLocalAddresses(t *testing.T) {
|
|||||||
if !addr.Equal(expected) {
|
if !addr.Equal(expected) {
|
||||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
||||||
}
|
}
|
||||||
fake.SetLocalAddresses("kube-ipvs0", "4.3.2.1")
|
fake.SetLocalAddresses("kube-ipvs0", "1.2.3.4", "4.3.2.1")
|
||||||
addr, _ = fake.GetAllLocalAddresses()
|
addr, _ = fake.GetAllLocalAddresses()
|
||||||
expected = sets.New("1.2.3.4", "4.3.2.1")
|
expected = sets.New("1.2.3.4", "4.3.2.1")
|
||||||
if !addr.Equal(expected) {
|
if !addr.Equal(expected) {
|
||||||
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
||||||
}
|
}
|
||||||
|
addr, _ = fake.GetAllLocalAddressesExcept("kube-ipvs0")
|
||||||
|
expected = sets.New("1.2.3.4")
|
||||||
|
if !addr.Equal(expected) {
|
||||||
|
t.Errorf("Unexpected mismatch, expected: %v, got: %v", expected, addr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user