[kube-proxy] Move ipv6 related funcs to utils pkg

This commit is contained in:
Zihong Zheng 2018-02-16 19:13:33 -08:00
parent b485f7b5b4
commit dfbec1a63a
9 changed files with 171 additions and 121 deletions

View File

@ -31,6 +31,7 @@ import (
iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables" iptablesproxy "k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/util/conntrack" "k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/utils/exec" "k8s.io/utils/exec"
) )
@ -165,7 +166,7 @@ func (hm *hostportManager) Add(id string, podPortMapping *PodPortMapping, natInt
// clean up opened host port if encounter any error // clean up opened host port if encounter any error
return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)}) return utilerrors.NewAggregate([]error{err, hm.closeHostports(hostportMappings)})
} }
isIpv6 := conntrack.IsIPv6(podPortMapping.IP) isIpv6 := utilnet.IsIPv6(podPortMapping.IP)
// Remove conntrack entries just after adding the new iptables rules. If the conntrack entry is removed along with // Remove conntrack entries just after adding the new iptables rules. If the conntrack entry is removed along with
// the IP tables rule, it can be the case that the packets received by the node after iptables rule removal will // the IP tables rule, it can be the case that the packets received by the node after iptables rule removal will

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/util/async" "k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack" "k8s.io/kubernetes/pkg/util/conntrack"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnet "k8s.io/kubernetes/pkg/util/net"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
utilversion "k8s.io/kubernetes/pkg/util/version" utilversion "k8s.io/kubernetes/pkg/util/version"
utilexec "k8s.io/utils/exec" utilexec "k8s.io/utils/exec"
@ -758,7 +759,7 @@ func (proxier *Proxier) syncProxyRules() {
glog.Errorf("Failed to cast serviceInfo %q", svcName.String()) glog.Errorf("Failed to cast serviceInfo %q", svcName.String())
continue continue
} }
isIPv6 := conntrack.IsIPv6(svcInfo.ClusterIP) isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
protocol := strings.ToLower(string(svcInfo.Protocol)) protocol := strings.ToLower(string(svcInfo.Protocol))
svcNameString := svcInfo.serviceNameString svcNameString := svcInfo.serviceNameString
hasEndpoints := len(proxier.endpointsMap[svcName]) > 0 hasEndpoints := len(proxier.endpointsMap[svcName]) > 0
@ -1223,7 +1224,7 @@ func (proxier *Proxier) syncProxyRules() {
break break
} }
// Ignore IP addresses with incorrect version // Ignore IP addresses with incorrect version
if isIPv6 && !conntrack.IsIPv6String(address) || !isIPv6 && conntrack.IsIPv6String(address) { if isIPv6 && !utilnet.IsIPv6String(address) || !isIPv6 && utilnet.IsIPv6String(address) {
glog.Errorf("IP address %s has incorrect IP version", address) glog.Errorf("IP address %s has incorrect IP version", address)
continue continue
} }

View File

@ -47,6 +47,7 @@ import (
utilipset "k8s.io/kubernetes/pkg/util/ipset" utilipset "k8s.io/kubernetes/pkg/util/ipset"
utiliptables "k8s.io/kubernetes/pkg/util/iptables" utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilipvs "k8s.io/kubernetes/pkg/util/ipvs"
utilnet "k8s.io/kubernetes/pkg/util/net"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
utilexec "k8s.io/utils/exec" utilexec "k8s.io/utils/exec"
) )
@ -289,7 +290,7 @@ func NewProxier(ipt utiliptables.Interface,
nodeIP = net.ParseIP("127.0.0.1") nodeIP = net.ParseIP("127.0.0.1")
} }
isIPv6 := conntrack.IsIPv6(nodeIP) isIPv6 := utilnet.IsIPv6(nodeIP)
glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)
@ -1006,7 +1007,7 @@ func (proxier *Proxier) syncProxyRules() {
continue continue
} }
if lp.Protocol == "udp" { if lp.Protocol == "udp" {
isIPv6 := conntrack.IsIPv6(svcInfo.ClusterIP) isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP) conntrack.ClearEntriesForPort(proxier.exec, lp.Port, isIPv6, clientv1.ProtocolUDP)
} }
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket

View File

@ -24,7 +24,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/helper" "k8s.io/kubernetes/pkg/apis/core/helper"
"k8s.io/kubernetes/pkg/util/conntrack" utilnet "k8s.io/kubernetes/pkg/util/net"
"github.com/golang/glog" "github.com/golang/glog"
) )
@ -117,10 +117,10 @@ func GetNodeAddresses(cidrs []string, nw NetworkInterfacer) (sets.String, error)
return nil, fmt.Errorf("error parsing CIDR for interface %s, error: %v", itf.Name, err) return nil, fmt.Errorf("error parsing CIDR for interface %s, error: %v", itf.Name, err)
} }
if ipNet.Contains(ip) { if ipNet.Contains(ip) {
if conntrack.IsIPv6(ip) && !uniqueAddressList.Has(IPv6ZeroCIDR) { if utilnet.IsIPv6(ip) && !uniqueAddressList.Has(IPv6ZeroCIDR) {
uniqueAddressList.Insert(ip.String()) uniqueAddressList.Insert(ip.String())
} }
if !conntrack.IsIPv6(ip) && !uniqueAddressList.Has(IPv4ZeroCIDR) { if !utilnet.IsIPv6(ip) && !uniqueAddressList.Has(IPv4ZeroCIDR) {
uniqueAddressList.Insert(ip.String()) uniqueAddressList.Insert(ip.String())
} }
} }

View File

@ -18,11 +18,11 @@ package conntrack
import ( import (
"fmt" "fmt"
"net"
"strconv" "strconv"
"strings" "strings"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/utils/exec" "k8s.io/utils/exec"
) )
@ -31,17 +31,6 @@ import (
// NoConnectionToDelete is the error string returned by conntrack when no matching connections are found // NoConnectionToDelete is the error string returned by conntrack when no matching connections are found
const NoConnectionToDelete = "0 flow entries have been deleted" const NoConnectionToDelete = "0 flow entries have been deleted"
// IsIPv6 returns true if the given ip address is a valid ipv6 address
func IsIPv6(netIP net.IP) bool {
return netIP != nil && netIP.To4() == nil
}
// IsIPv6String returns true if the given string is a valid ipv6 address
func IsIPv6String(ip string) bool {
netIP := net.ParseIP(ip)
return IsIPv6(netIP)
}
func protoStr(proto v1.Protocol) string { func protoStr(proto v1.Protocol) string {
return strings.ToLower(string(proto)) return strings.ToLower(string(proto))
} }
@ -56,7 +45,7 @@ func parametersWithFamily(isIPv6 bool, parameters ...string) []string {
// ClearEntriesForIP uses the conntrack tool to delete the conntrack entries // ClearEntriesForIP uses the conntrack tool to delete the conntrack entries
// for the UDP connections specified by the given service IP // for the UDP connections specified by the given service IP
func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) error { func ClearEntriesForIP(execer exec.Interface, ip string, protocol v1.Protocol) error {
parameters := parametersWithFamily(IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol)) parameters := parametersWithFamily(utilnet.IsIPv6String(ip), "-D", "--orig-dst", ip, "-p", protoStr(protocol))
err := Exec(execer, parameters...) err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
@ -107,7 +96,7 @@ func ClearEntriesForPort(execer exec.Interface, port int, isIPv6 bool, protocol
// ClearEntriesForNAT uses the conntrack tool to delete the conntrack entries // ClearEntriesForNAT uses the conntrack tool to delete the conntrack entries
// for connections specified by the {origin, dest} IP pair. // for connections specified by the {origin, dest} IP pair.
func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.Protocol) error { func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.Protocol) error {
parameters := parametersWithFamily(IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest, parameters := parametersWithFamily(utilnet.IsIPv6String(origin), "-D", "--orig-dst", origin, "--dst-nat", dest,
"-p", protoStr(protocol)) "-p", protoStr(protocol))
err := Exec(execer, parameters...) err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {

View File

@ -18,11 +18,11 @@ package conntrack
import ( import (
"fmt" "fmt"
"net"
"strings" "strings"
"testing" "testing"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/utils/exec" "k8s.io/utils/exec"
fakeexec "k8s.io/utils/exec/testing" fakeexec "k8s.io/utils/exec/testing"
) )
@ -119,7 +119,7 @@ func TestClearUDPConntrackForIP(t *testing.T) {
if err := ClearEntriesForIP(&fexec, tc.ip, v1.ProtocolUDP); err != nil { if err := ClearEntriesForIP(&fexec, tc.ip, v1.ProtocolUDP); err != nil {
t.Errorf("%s test case:, Unexpected error: %v", tc.name, err) t.Errorf("%s test case:, Unexpected error: %v", tc.name, err)
} }
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(IsIPv6String(tc.ip)) expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.ip) + familyParamStr(utilnet.IsIPv6String(tc.ip))
execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ") execCommand := strings.Join(fcmd.CombinedOutputLog[svcCount], " ")
if expectCommand != execCommand { if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
@ -223,7 +223,7 @@ func TestDeleteUDPConnections(t *testing.T) {
if err != nil { if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err) t.Errorf("%s test case: unexpected error: %v", tc.name, err)
} }
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(IsIPv6String(tc.origin)) expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand { if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
@ -234,99 +234,3 @@ func TestDeleteUDPConnections(t *testing.T) {
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls) t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
} }
} }
func TestIsIPv6String(t *testing.T) {
testCases := []struct {
ip string
expectIPv6 bool
}{
{
ip: "127.0.0.1",
expectIPv6: false,
},
{
ip: "192.168.0.0",
expectIPv6: false,
},
{
ip: "1.2.3.4",
expectIPv6: false,
},
{
ip: "bad ip",
expectIPv6: false,
},
{
ip: "::1",
expectIPv6: true,
},
{
ip: "fd00::600d:f00d",
expectIPv6: true,
},
{
ip: "2001:db8::5",
expectIPv6: true,
},
}
for i := range testCases {
isIPv6 := IsIPv6String(testCases[i].ip)
if isIPv6 != testCases[i].expectIPv6 {
t.Errorf("[%d] Expect ipv6 %v, got %v", i+1, testCases[i].expectIPv6, isIPv6)
}
}
}
func TestIsIPv6(t *testing.T) {
testCases := []struct {
ip net.IP
expectIPv6 bool
}{
{
ip: net.IPv4zero,
expectIPv6: false,
},
{
ip: net.IPv4bcast,
expectIPv6: false,
},
{
ip: net.ParseIP("127.0.0.1"),
expectIPv6: false,
},
{
ip: net.ParseIP("10.20.40.40"),
expectIPv6: false,
},
{
ip: net.ParseIP("172.17.3.0"),
expectIPv6: false,
},
{
ip: nil,
expectIPv6: false,
},
{
ip: net.IPv6loopback,
expectIPv6: true,
},
{
ip: net.IPv6zero,
expectIPv6: true,
},
{
ip: net.ParseIP("fd00::600d:f00d"),
expectIPv6: true,
},
{
ip: net.ParseIP("2001:db8::5"),
expectIPv6: true,
},
}
for i := range testCases {
isIPv6 := IsIPv6(testCases[i].ip)
if isIPv6 != testCases[i].expectIPv6 {
t.Errorf("[%d] Expect ipv6 %v, got %v", i+1, testCases[i].expectIPv6, isIPv6)
}
}
}

4
pkg/util/net/OWNERS Normal file
View File

@ -0,0 +1,4 @@
reviewers:
- sig-network-reviewers
approvers:
- sig-network-approvers

32
pkg/util/net/net.go Normal file
View File

@ -0,0 +1,32 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"net"
)
// IsIPv6 returns if netIP is IPv6.
func IsIPv6(netIP net.IP) bool {
return netIP != nil && netIP.To4() == nil
}
// IsIPv6String returns if ip is IPv6.
func IsIPv6String(ip string) bool {
netIP := net.ParseIP(ip)
return IsIPv6(netIP)
}

118
pkg/util/net/net_test.go Normal file
View File

@ -0,0 +1,118 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package net
import (
"net"
"testing"
)
func TestIsIPv6String(t *testing.T) {
testCases := []struct {
ip string
expectIPv6 bool
}{
{
ip: "127.0.0.1",
expectIPv6: false,
},
{
ip: "192.168.0.0",
expectIPv6: false,
},
{
ip: "1.2.3.4",
expectIPv6: false,
},
{
ip: "bad ip",
expectIPv6: false,
},
{
ip: "::1",
expectIPv6: true,
},
{
ip: "fd00::600d:f00d",
expectIPv6: true,
},
{
ip: "2001:db8::5",
expectIPv6: true,
},
}
for i := range testCases {
isIPv6 := IsIPv6String(testCases[i].ip)
if isIPv6 != testCases[i].expectIPv6 {
t.Errorf("[%d] Expect ipv6 %v, got %v", i+1, testCases[i].expectIPv6, isIPv6)
}
}
}
func TestIsIPv6(t *testing.T) {
testCases := []struct {
ip net.IP
expectIPv6 bool
}{
{
ip: net.IPv4zero,
expectIPv6: false,
},
{
ip: net.IPv4bcast,
expectIPv6: false,
},
{
ip: net.ParseIP("127.0.0.1"),
expectIPv6: false,
},
{
ip: net.ParseIP("10.20.40.40"),
expectIPv6: false,
},
{
ip: net.ParseIP("172.17.3.0"),
expectIPv6: false,
},
{
ip: nil,
expectIPv6: false,
},
{
ip: net.IPv6loopback,
expectIPv6: true,
},
{
ip: net.IPv6zero,
expectIPv6: true,
},
{
ip: net.ParseIP("fd00::600d:f00d"),
expectIPv6: true,
},
{
ip: net.ParseIP("2001:db8::5"),
expectIPv6: true,
},
}
for i := range testCases {
isIPv6 := IsIPv6(testCases[i].ip)
if isIPv6 != testCases[i].expectIPv6 {
t.Errorf("[%d] Expect ipv6 %v, got %v", i+1, testCases[i].expectIPv6, isIPv6)
}
}
}