Merge pull request #114027 from anggao/master

e2e: make GetSubnetPrefix IP family agnostic
This commit is contained in:
Kubernetes Prow Robot 2022-12-12 02:45:33 -08:00 committed by GitHub
commit cef3e3ffe6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 51 deletions

View File

@ -377,42 +377,6 @@ func GetRandomReadySchedulableNode(c clientset.Interface) (*v1.Node, error) {
return &nodes.Items[rand.Intn(len(nodes.Items))], nil return &nodes.Items[rand.Intn(len(nodes.Items))], nil
} }
// GetSubnetPrefix gets first 2 number of an IP in the node subnet. [IPv4]
// It assumes that the subnet mask is /16.
func GetSubnetPrefix(c clientset.Interface) ([]string, error) {
node, err := GetReadySchedulableWorkerNode(c)
if err != nil {
return nil, fmt.Errorf("error getting a ready schedulable worker Node, err: %v", err)
}
internalIP, err := GetInternalIP(node)
if err != nil {
return nil, fmt.Errorf("error getting Node internal IP, err: %v", err)
}
splitted := strings.Split(internalIP, ".")
if len(splitted) == 4 {
return splitted[:2], nil
}
return nil, fmt.Errorf("invalid IP address format: %s", internalIP)
}
// GetReadySchedulableWorkerNode gets a single worker node which is available for
// running pods on. If there are no such available nodes it will return an error.
func GetReadySchedulableWorkerNode(c clientset.Interface) (*v1.Node, error) {
nodes, err := GetReadySchedulableNodes(c)
if err != nil {
return nil, err
}
for i := range nodes.Items {
node := nodes.Items[i]
_, isMaster := node.Labels["node-role.kubernetes.io/master"]
_, isControlPlane := node.Labels["node-role.kubernetes.io/control-plane"]
if !isMaster && !isControlPlane {
return &node, nil
}
}
return nil, fmt.Errorf("there are currently no ready, schedulable worker nodes in the cluster")
}
// GetReadyNodesIncludingTainted returns all ready nodes, even those which are tainted. // GetReadyNodesIncludingTainted returns all ready nodes, even those which are tainted.
// There are cases when we care about tainted nodes // There are cases when we care about tainted nodes
// E.g. in tests related to nodes with gpu we care about nodes despite // E.g. in tests related to nodes with gpu we care about nodes despite

View File

@ -19,6 +19,7 @@ package network
import ( import (
"context" "context"
"fmt" "fmt"
"math/big"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@ -54,17 +55,72 @@ import (
"github.com/onsi/gomega" "github.com/onsi/gomega"
) )
// getInternalIP returns node internal IP
func getInternalIP(node *v1.Node) (string, error) {
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeInternalIP && address.Address != "" {
return address.Address, nil
}
}
return "", fmt.Errorf("couldn't get the internal IP of host %s with addresses %v", node.Name, node.Status.Addresses)
}
// getSubnetPrefix returns a network prefix based on one of the workers
// InternalIP adding a /16 or /64 mask depending on the IP family of the node.
// IMPORTANT: These assumes a flat network assigned to the nodes, that is common
// on cloud providers.
func getSubnetPrefix(c clientset.Interface) (*net.IPNet, error) {
node, err := getReadySchedulableWorkerNode(c)
if err != nil {
return nil, fmt.Errorf("error getting a ready schedulable worker Node, err: %v", err)
}
internalIP, err := getInternalIP(node)
if err != nil {
return nil, fmt.Errorf("error getting Node internal IP, err: %v", err)
}
ip := netutils.ParseIPSloppy(internalIP)
if ip == nil {
return nil, fmt.Errorf("invalid IP address format: %s", internalIP)
}
// if IPv6 return a net.IPNet with IP = ip and mask /64
ciderMask := net.CIDRMask(64, 128)
// if IPv4 return a net.IPNet with IP = ip and mask /16
if netutils.IsIPv4(ip) {
ciderMask = net.CIDRMask(16, 32)
}
return &net.IPNet{IP: ip.Mask(ciderMask), Mask: ciderMask}, nil
}
// getReadySchedulableWorkerNode gets a single worker node which is available for
// running pods on. If there are no such available nodes it will return an error.
func getReadySchedulableWorkerNode(c clientset.Interface) (*v1.Node, error) {
nodes, err := e2enode.GetReadySchedulableNodes(c)
if err != nil {
return nil, err
}
for i := range nodes.Items {
node := nodes.Items[i]
_, isMaster := node.Labels["node-role.kubernetes.io/master"]
_, isControlPlane := node.Labels["node-role.kubernetes.io/control-plane"]
if !isMaster && !isControlPlane {
return &node, nil
}
}
return nil, fmt.Errorf("there are currently no ready, schedulable worker nodes in the cluster")
}
var _ = common.SIGDescribe("LoadBalancers", func() { var _ = common.SIGDescribe("LoadBalancers", func() {
f := framework.NewDefaultFramework("loadbalancers") f := framework.NewDefaultFramework("loadbalancers")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
var cs clientset.Interface var cs clientset.Interface
var subnetPrefix []string var subnetPrefix *net.IPNet
var err error var err error
ginkgo.BeforeEach(func() { ginkgo.BeforeEach(func() {
cs = f.ClientSet cs = f.ClientSet
subnetPrefix, err = e2enode.GetSubnetPrefix(cs) subnetPrefix, err = getSubnetPrefix(cs)
framework.ExpectNoError(err) framework.ExpectNoError(err)
}) })
@ -485,7 +541,7 @@ var _ = common.SIGDescribe("LoadBalancers", func() {
}) })
ginkgo.It("should only allow access from service loadbalancer source ranges [Slow]", func(ctx context.Context) { ginkgo.It("should only allow access from service loadbalancer source ranges [Slow]", func(ctx context.Context) {
// this feature currently supported only on GCE/GKE/AWS // this feature currently supported only on GCE/GKE/AWS/AZURE
e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws", "azure") e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws", "azure")
loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs) loadBalancerCreateTimeout := e2eservice.GetServiceLoadBalancerCreationTimeout(cs)
@ -586,8 +642,12 @@ var _ = common.SIGDescribe("LoadBalancers", func() {
isInternalEndpoint := func(lbIngress *v1.LoadBalancerIngress) bool { isInternalEndpoint := func(lbIngress *v1.LoadBalancerIngress) bool {
ingressEndpoint := e2eservice.GetIngressPoint(lbIngress) ingressEndpoint := e2eservice.GetIngressPoint(lbIngress)
ingressIP := netutils.ParseIPSloppy(ingressEndpoint)
if ingressIP == nil {
framework.Failf("invalid ingressEndpoint IP address format: %s", ingressEndpoint)
}
// Needs update for providers using hostname as endpoint. // Needs update for providers using hostname as endpoint.
return strings.HasPrefix(ingressEndpoint, subnetPrefix[0]+".") return subnetPrefix.Contains(ingressIP)
} }
ginkgo.By("creating a service with type LoadBalancer and cloud specific Internal-LB annotation enabled") ginkgo.By("creating a service with type LoadBalancer and cloud specific Internal-LB annotation enabled")
@ -667,7 +727,10 @@ var _ = common.SIGDescribe("LoadBalancers", func() {
if framework.ProviderIs("azure") { if framework.ProviderIs("azure") {
ginkgo.By("switching back to interal type LoadBalancer, with static IP specified.") ginkgo.By("switching back to interal type LoadBalancer, with static IP specified.")
// For a cluster created with CAPZ, node-subnet may not be "10.240.0.0/16", e.g. "10.1.0.0/16". // For a cluster created with CAPZ, node-subnet may not be "10.240.0.0/16", e.g. "10.1.0.0/16".
internalStaticIP := fmt.Sprintf("%s.%s.11.11", subnetPrefix[0], subnetPrefix[1]) base := netutils.BigForIP(subnetPrefix.IP)
offset := big.NewInt(0).SetBytes(netutils.ParseIPSloppy("0.0.11.11").To4()).Int64()
internalStaticIP := netutils.AddIPOffset(base, int(offset)).String()
svc, err = jig.UpdateService(func(svc *v1.Service) { svc, err = jig.UpdateService(func(svc *v1.Service) {
svc.Spec.LoadBalancerIP = internalStaticIP svc.Spec.LoadBalancerIP = internalStaticIP
@ -1244,7 +1307,7 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
var loadBalancerCreateTimeout time.Duration var loadBalancerCreateTimeout time.Duration
var cs clientset.Interface var cs clientset.Interface
var subnetPrefix []string var subnetPrefix *net.IPNet
var err error var err error
ginkgo.BeforeEach(func() { ginkgo.BeforeEach(func() {
@ -1253,7 +1316,7 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
cs = f.ClientSet cs = f.ClientSet
loadBalancerCreateTimeout = e2eservice.GetServiceLoadBalancerCreationTimeout(cs) loadBalancerCreateTimeout = e2eservice.GetServiceLoadBalancerCreationTimeout(cs)
subnetPrefix, err = e2enode.GetSubnetPrefix(cs) subnetPrefix, err = getSubnetPrefix(cs)
framework.ExpectNoError(err) framework.ExpectNoError(err)
}) })
@ -1301,12 +1364,21 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0]) ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
ginkgo.By("reading clientIP using the TCP service's service port via its external VIP") ginkgo.By("reading clientIP using the TCP service's service port via its external VIP")
clientIP, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip") clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, "/clientip")
framework.ExpectNoError(err) framework.ExpectNoError(err)
framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIP) framework.Logf("ClientIP detected by target pod using VIP:SvcPort is %s", clientIPPort)
ginkgo.By("checking if Source IP is preserved") ginkgo.By("checking if Source IP is preserved")
if strings.HasPrefix(clientIP, subnetPrefix[0]+".") { // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
host, _, err := net.SplitHostPort(clientIPPort)
if err != nil {
framework.Failf("SplitHostPort returned unexpected error: %q", clientIPPort)
}
ip := netutils.ParseIPSloppy(host)
if ip == nil {
framework.Failf("Invalid client IP address format: %q", host)
}
if subnetPrefix.Contains(ip) {
framework.Failf("Source IP was NOT preserved") framework.Failf("Source IP was NOT preserved")
} }
}) })
@ -1581,11 +1653,22 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP)) ginkgo.By(fmt.Sprintf("checking source ip is NOT preserved through loadbalancer %v", ingressIP))
var clientIP string var clientIP string
pollErr := wait.PollImmediate(framework.Poll, 3*e2eservice.KubeProxyLagTimeout, func() (bool, error) { pollErr := wait.PollImmediate(framework.Poll, 3*e2eservice.KubeProxyLagTimeout, func() (bool, error) {
clientIP, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
if err != nil { if err != nil {
return false, nil return false, nil
} }
if strings.HasPrefix(clientIP, subnetPrefix[0]+".") { // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
host, _, err := net.SplitHostPort(clientIPPort)
if err != nil {
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
return false, nil
}
ip := netutils.ParseIPSloppy(host)
if ip == nil {
framework.Logf("Invalid client IP address format: %q", host)
return false, nil
}
if subnetPrefix.Contains(ip) {
return true, nil return true, nil
} }
return false, nil return false, nil
@ -1609,12 +1692,23 @@ var _ = common.SIGDescribe("LoadBalancers ESIPP [Slow]", func() {
framework.ExpectNoError(err) framework.ExpectNoError(err)
loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(cs) loadBalancerPropagationTimeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(cs)
pollErr = wait.PollImmediate(framework.PollShortTimeout, loadBalancerPropagationTimeout, func() (bool, error) { pollErr = wait.PollImmediate(framework.PollShortTimeout, loadBalancerPropagationTimeout, func() (bool, error) {
clientIP, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path) clientIPPort, err := GetHTTPContent(ingressIP, svcTCPPort, e2eservice.KubeProxyLagTimeout, path)
if err != nil { if err != nil {
return false, nil return false, nil
} }
ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIP)) ginkgo.By(fmt.Sprintf("Endpoint %v:%v%v returned client ip %v", ingressIP, svcTCPPort, path, clientIPPort))
if !strings.HasPrefix(clientIP, subnetPrefix[0]+".") { // The clientIPPort returned from GetHTTPContent is in this format: x.x.x.x:port or [xx:xx:xx::x]:port
host, _, err := net.SplitHostPort(clientIPPort)
if err != nil {
framework.Logf("SplitHostPort returned unexpected error: %q", clientIPPort)
return false, nil
}
ip := netutils.ParseIPSloppy(host)
if ip == nil {
framework.Logf("Invalid client IP address format: %q", host)
return false, nil
}
if !subnetPrefix.Contains(ip) {
return true, nil return true, nil
} }
return false, nil return false, nil