diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 40e50680f5f..ad2f74449cb 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -170,6 +170,11 @@ type Proxier struct { // lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets // from the source CIDR visit ingress IP:Port can pass through. lbWhiteListCIDRSet *IPSet + // Values are as a parameter to select the interfaces where nodeport works. + nodePortAddresses []string + // networkInterfacer defines an interface for several net library functions. + // Inject for test purpose. + networkInterfacer utilproxy.NetworkInterfacer } // IPGetter helps get node network interface IP @@ -253,6 +258,7 @@ func NewProxier(ipt utiliptables.Interface, recorder record.EventRecorder, healthzServer healthcheck.HealthzUpdater, scheduler string, + nodePortAddresses []string, ) (*Proxier, error) { // Set the route_localnet sysctl we need for if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil { @@ -336,6 +342,8 @@ func NewProxier(ipt utiliptables.Interface, lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), + nodePortAddresses: nodePortAddresses, + networkInterfacer: utilproxy.RealNetwork{}, } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -1134,6 +1142,7 @@ func (proxier *Proxier) syncProxyRules() { } var nodePortSet *IPSet switch protocol { + case "tcp": nodePortSet = proxier.nodePortSetTCP case "udp": @@ -1152,31 +1161,43 @@ func (proxier *Proxier) syncProxyRules() { } // Build ipvs kernel routes for each node ip address - nodeIPs, err := proxier.ipGetter.NodeIPs() + nodeIPs := make([]net.IP, 0) + addresses, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer) if err != nil { - glog.Errorf("Failed to get node IP, err: %v", err) - } else { - for _, nodeIP := range nodeIPs { - // ipvs call - serv := &utilipvs.VirtualServer{ - Address: nodeIP, - Port: uint16(svcInfo.nodePort), - Protocol: string(svcInfo.protocol), - Scheduler: proxier.ipvsScheduler, - } - if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { - serv.Flags |= utilipvs.FlagPersistent - serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds) - } - // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. - if err := proxier.syncService(svcNameString, serv, false); err == nil { - activeIPVSServices[serv.String()] = true - if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil { - glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) - } - } else { - glog.Errorf("Failed to sync service: %v, err: %v", serv, err) + glog.Errorf("Failed to get node ip address matching nodeport cidr") + continue + } + for address := range addresses { + if !utilproxy.IsZeroCIDR(address) { + nodeIPs = append(nodeIPs, net.ParseIP(address)) + continue + } + // zero cidr + nodeIPs, err = proxier.ipGetter.NodeIPs() + if err != nil { + glog.Errorf("Failed to list all node IPs from host, err: %v", err) + } + } + for _, nodeIP := range nodeIPs { + // ipvs call + serv := &utilipvs.VirtualServer{ + Address: nodeIP, + Port: uint16(svcInfo.nodePort), + Protocol: string(svcInfo.protocol), + Scheduler: proxier.ipvsScheduler, + } + if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { + serv.Flags |= utilipvs.FlagPersistent + serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds) + } + // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`. + if err := proxier.syncService(svcNameString, serv, false); err == nil { + activeIPVSServices[serv.String()] = true + if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil { + glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } + } else { + glog.Errorf("Failed to sync service: %v, err: %v", serv, err) } } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 2f1e8418a08..a26e7a38b49 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" proxyutil "k8s.io/kubernetes/pkg/proxy/util" + proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" utilipset "k8s.io/kubernetes/pkg/util/ipset" ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing" utiliptables "k8s.io/kubernetes/pkg/util/iptables" @@ -146,6 +147,8 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), + nodePortAddresses: make([]string, 0), + networkInterfacer: proxyutiltest.NewFakeNetwork(), } } @@ -386,6 +389,8 @@ func TestNodePort(t *testing.T) { }), ) + fp.nodePortAddresses = []string{"0.0.0.0/0"} + fp.syncProxyRules() // Check ipvs service and destinations @@ -445,6 +450,8 @@ func TestNodePortNoEndpoint(t *testing.T) { ) makeEndpointsMap(fp) + fp.nodePortAddresses = []string{"0.0.0.0/0"} + fp.syncProxyRules() // Check ipvs service and destinations @@ -847,15 +854,23 @@ func TestOnlyLocalNodePorts(t *testing.T) { }), ) + itf := net.Interface{Index: 0, MTU: 0, Name: "eth0", HardwareAddr: nil, Flags: 0} + addrs := []net.Addr{proxyutiltest.AddrStruct{Val: "100.101.102.103/24"}} + itf1 := net.Interface{Index: 1, MTU: 0, Name: "eth1", HardwareAddr: nil, Flags: 0} + addrs1 := []net.Addr{proxyutiltest.AddrStruct{Val: "2001:db8::0/64"}} + fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf, addrs) + fp.networkInterfacer.(*proxyutiltest.FakeNetwork).AddInterfaceAddr(&itf1, addrs1) + fp.nodePortAddresses = []string{"100.101.102.0/24", "2001:db8::0/64"} + fp.syncProxyRules() - // Expect 2 services and 1 destination + // Expect 3 services and 1 destination services, err := ipvs.GetVirtualServers() if err != nil { t.Errorf("Failed to get ipvs services, err: %v", err) } - if len(services) != 2 { - t.Errorf("Expect 2 ipvs services, got %d", len(services)) + if len(services) != 3 { + t.Errorf("Expect 3 ipvs services, got %d", len(services)) } found := false for _, svc := range services { diff --git a/pkg/proxy/util/interface.go b/pkg/proxy/util/interface.go deleted file mode 100644 index 2c6f02c9134..00000000000 --- a/pkg/proxy/util/interface.go +++ /dev/null @@ -1,83 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package util - -import ( - "net" -) - -// NetworkInterface defines an interface for several net library functions. Production -// code will forward to net library functions, and unit tests will override the methods -// for testing purposes. -type NetworkInterface interface { - Addrs(intf *net.Interface) ([]net.Addr, error) - Interfaces() ([]net.Interface, error) -} - -// RealNetwork implements the NetworkInterface interface for production code, just -// wrapping the underlying net library function calls. -type RealNetwork struct{} - -// Addrs wraps net.Interface.Addrs() -func (_ RealNetwork) Addrs(intf *net.Interface) ([]net.Addr, error) { - return intf.Addrs() -} - -// Interfaces wraps net.Interfaces() -func (_ RealNetwork) Interfaces() ([]net.Interface, error) { - return net.Interfaces() -} - -// RealNetwork implements the NetworkInterface interface for production code, just -// wrapping the underlying net library function calls. -type FakeNetwork struct { - NetworkInterfaces []net.Interface - // The key of map Addrs is interface name - Address map[string][]net.Addr -} - -func NewFakeNetwork() *FakeNetwork { - return &FakeNetwork{ - NetworkInterfaces: make([]net.Interface, 0), - Address: make(map[string][]net.Addr), - } -} - -// AddInterfaceAddr create an interface and its associated addresses for FakeNetwork implementation. -func (f *FakeNetwork) AddInterfaceAddr(intf *net.Interface, addrs []net.Addr) { - f.NetworkInterfaces = append(f.NetworkInterfaces, *intf) - f.Address[intf.Name] = addrs -} - -// Addrs is part of FakeNetwork interface. -func (f *FakeNetwork) Addrs(intf *net.Interface) ([]net.Addr, error) { - return f.Address[intf.Name], nil -} - -// Interfaces is part of FakeNetwork interface. -func (f *FakeNetwork) Interfaces() ([]net.Interface, error) { - return f.NetworkInterfaces, nil -} - -type AddrStruct struct{ Val string } - -func (a AddrStruct) Network() string { - return a.Val -} -func (a AddrStruct) String() string { - return a.Val -}