mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
353 lines
13 KiB
Go
353 lines
13 KiB
Go
/*
|
|
Copyright 2020 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package netpol
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
v1 "k8s.io/api/core/v1"
|
|
networkingv1 "k8s.io/api/networking/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
clientset "k8s.io/client-go/kubernetes"
|
|
"k8s.io/kubernetes/test/e2e/framework"
|
|
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
|
|
netutils "k8s.io/utils/net"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// defaultPollIntervalSeconds [seconds] is the default value for which the Prober will wait before attempting next attempt.
|
|
const defaultPollIntervalSeconds = 1
|
|
|
|
// defaultPollTimeoutSeconds [seconds] is the default timeout when polling on probes.
|
|
const defaultPollTimeoutSeconds = 10
|
|
|
|
// TestPod represents an actual running pod. For each Pod defined by the model,
|
|
// there will be a corresponding TestPod. TestPod includes some runtime info
|
|
// (namespace name, service IP) which is not available in the model.
|
|
type TestPod struct {
|
|
Namespace string
|
|
Name string
|
|
ContainerName string
|
|
ServiceIP string
|
|
}
|
|
|
|
func (pod TestPod) PodString() PodString {
|
|
return NewPodString(pod.Namespace, pod.Name)
|
|
}
|
|
|
|
// kubeManager provides a convenience interface to kube functionality that we leverage for polling NetworkPolicy connections.
|
|
// Its responsibilities are:
|
|
// - creating resources (pods, deployments, namespaces, services, network policies)
|
|
// - modifying and cleaning up resources
|
|
type kubeManager struct {
|
|
framework *framework.Framework
|
|
clientSet clientset.Interface
|
|
namespaceNames []string
|
|
allPods []TestPod
|
|
allPodStrings []PodString
|
|
dnsDomain string
|
|
}
|
|
|
|
// newKubeManager is a utility function that wraps creation of the kubeManager instance.
|
|
func newKubeManager(framework *framework.Framework, dnsDomain string) *kubeManager {
|
|
return &kubeManager{
|
|
framework: framework,
|
|
clientSet: framework.ClientSet,
|
|
dnsDomain: dnsDomain,
|
|
}
|
|
}
|
|
|
|
// initializeCluster initialized the cluster, creating namespaces pods and services as needed.
|
|
func (k *kubeManager) initializeClusterFromModel(ctx context.Context, model *Model) error {
|
|
var createdPods []*v1.Pod
|
|
for _, ns := range model.Namespaces {
|
|
// no labels needed, we just need the default kubernetes.io/metadata.name label
|
|
namespace, err := k.framework.CreateNamespace(ctx, ns.BaseName, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
namespaceName := namespace.Name
|
|
k.namespaceNames = append(k.namespaceNames, namespaceName)
|
|
|
|
for _, pod := range ns.Pods {
|
|
framework.Logf("creating pod %s/%s with matching service", namespaceName, pod.Name)
|
|
|
|
// note that we defer the logic of pod (i.e. node selector) specifics to the model
|
|
// which is aware of linux vs windows pods
|
|
kubePod, err := k.createPod(ctx, pod.KubePod(namespaceName))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
createdPods = append(createdPods, kubePod)
|
|
svc, err := k.createService(ctx, pod.Service(namespaceName))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if netutils.ParseIPSloppy(svc.Spec.ClusterIP) == nil {
|
|
return fmt.Errorf("empty IP address found for service %s/%s", svc.Namespace, svc.Name)
|
|
}
|
|
|
|
k.allPods = append(k.allPods, TestPod{
|
|
Namespace: kubePod.Namespace,
|
|
Name: kubePod.Name,
|
|
ContainerName: pod.Containers[0].Name(),
|
|
ServiceIP: svc.Spec.ClusterIP,
|
|
})
|
|
k.allPodStrings = append(k.allPodStrings, NewPodString(kubePod.Namespace, kubePod.Name))
|
|
}
|
|
}
|
|
|
|
for _, createdPod := range createdPods {
|
|
err := e2epod.WaitForPodRunningInNamespace(ctx, k.clientSet, createdPod)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to wait for pod %s/%s: %w", createdPod.Namespace, createdPod.Name, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (k *kubeManager) AllPods() []TestPod {
|
|
return k.allPods
|
|
}
|
|
|
|
func (k *kubeManager) AllPodStrings() []PodString {
|
|
return k.allPodStrings
|
|
}
|
|
|
|
func (k *kubeManager) DNSDomain() string {
|
|
return k.dnsDomain
|
|
}
|
|
|
|
func (k *kubeManager) NamespaceNames() []string {
|
|
return k.namespaceNames
|
|
}
|
|
|
|
// getPod gets a pod by namespace and name.
|
|
func (k *kubeManager) getPod(ctx context.Context, ns string, name string) (*v1.Pod, error) {
|
|
kubePod, err := k.clientSet.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to get pod %s/%s: %w", ns, name, err)
|
|
}
|
|
return kubePod, nil
|
|
}
|
|
|
|
// probeConnectivity execs into a pod and checks its connectivity to another pod.
|
|
// Implements the Prober interface.
|
|
func (k *kubeManager) probeConnectivity(args *probeConnectivityArgs) (bool, string, error) {
|
|
port := strconv.Itoa(args.toPort)
|
|
if args.addrTo == "" {
|
|
return false, "no IP provided", fmt.Errorf("empty addrTo field")
|
|
}
|
|
framework.Logf("Starting probe from pod %v to %v", args.podFrom, args.addrTo)
|
|
var cmd []string
|
|
timeout := fmt.Sprintf("--timeout=%vs", args.timeoutSeconds)
|
|
|
|
switch args.protocol {
|
|
case v1.ProtocolSCTP:
|
|
cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=sctp"}
|
|
case v1.ProtocolTCP:
|
|
cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=tcp"}
|
|
case v1.ProtocolUDP:
|
|
cmd = []string{"/agnhost", "connect", net.JoinHostPort(args.addrTo, port), timeout, "--protocol=udp"}
|
|
if framework.NodeOSDistroIs("windows") {
|
|
framework.Logf("probing UDP for windows may result in cluster instability for certain windows nodes with low CPU/Memory, depending on CRI version")
|
|
}
|
|
default:
|
|
framework.Failf("protocol %s not supported", args.protocol)
|
|
}
|
|
|
|
commandDebugString := fmt.Sprintf("kubectl exec %s -c %s -n %s -- %s", args.podFrom, args.containerFrom, args.nsFrom, strings.Join(cmd, " "))
|
|
|
|
attempt := 0
|
|
|
|
// NOTE: The return value of this function[probeConnectivity] should be true if the probe is successful and false otherwise.
|
|
|
|
// probeError will be the return value of this function[probeConnectivity] call.
|
|
var probeError error
|
|
var stderr string
|
|
|
|
// Instead of re-running the job on connectivity failure, the following conditionFunc when passed to PollImmediate, reruns
|
|
// the job when the observed value don't match the expected value, so we don't rely on return value of PollImmediate, we
|
|
// simply discard it and use probeError, defined outside scope of conditionFunc, for returning the result of probeConnectivity.
|
|
conditionFunc := func() (bool, error) {
|
|
_, stderr, probeError = k.executeRemoteCommand(args.nsFrom, args.podFrom, args.containerFrom, cmd)
|
|
// retry should only occur if expected and observed value don't match.
|
|
if args.expectConnectivity {
|
|
if probeError != nil {
|
|
// since we expect connectivity here, we fail the condition for PollImmediate to reattempt the probe.
|
|
// this happens in the cases where network is congested, we don't have any policy rejecting traffic
|
|
// from "podFrom" to "podTo" and probes from "podFrom" to "podTo" are failing.
|
|
framework.Logf("probe #%d :: connectivity expected :: %s/%s -> %s :: stderr - %s",
|
|
attempt+1, args.nsFrom, args.podFrom, args.addrTo, stderr,
|
|
)
|
|
attempt++
|
|
return false, nil
|
|
} else {
|
|
// we got the expected results, exit immediately.
|
|
return true, nil
|
|
}
|
|
} else {
|
|
if probeError != nil {
|
|
// we got the expected results, exit immediately.
|
|
return true, nil
|
|
} else {
|
|
// since we don't expect connectivity here, we fail the condition for PollImmediate to reattempt the probe.
|
|
// this happens in the cases where we have policy rejecting traffic from "podFrom" to "podTo", but CNI takes
|
|
// time to implement the policy and probe from "podFrom" to "podTo" was successful in that window.
|
|
framework.Logf(" probe #%d :: connectivity not expected :: %s/%s -> %s",
|
|
attempt+1, args.nsFrom, args.podFrom, args.addrTo,
|
|
)
|
|
attempt++
|
|
return false, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// ignore the result of PollImmediate, we are only concerned with probeError.
|
|
_ = wait.PollImmediate(
|
|
time.Duration(args.pollIntervalSeconds)*time.Second,
|
|
time.Duration(args.pollTimeoutSeconds)*time.Second,
|
|
conditionFunc,
|
|
)
|
|
|
|
if probeError != nil {
|
|
return false, commandDebugString, nil
|
|
}
|
|
return true, commandDebugString, nil
|
|
}
|
|
|
|
// executeRemoteCommand executes a remote shell command on the given pod.
|
|
func (k *kubeManager) executeRemoteCommand(namespace string, pod string, containerName string, command []string) (string, string, error) {
|
|
return e2epod.ExecWithOptions(k.framework, e2epod.ExecOptions{
|
|
Command: command,
|
|
Namespace: namespace,
|
|
PodName: pod,
|
|
ContainerName: containerName,
|
|
Stdin: nil,
|
|
CaptureStdout: true,
|
|
CaptureStderr: true,
|
|
PreserveWhitespace: false,
|
|
})
|
|
}
|
|
|
|
// createService is a convenience function for service setup.
|
|
func (k *kubeManager) createService(ctx context.Context, service *v1.Service) (*v1.Service, error) {
|
|
ns := service.Namespace
|
|
name := service.Name
|
|
|
|
createdService, err := k.clientSet.CoreV1().Services(ns).Create(ctx, service, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create service %s/%s: %w", ns, name, err)
|
|
}
|
|
return createdService, nil
|
|
}
|
|
|
|
// createPod is a convenience function for pod setup.
|
|
func (k *kubeManager) createPod(ctx context.Context, pod *v1.Pod) (*v1.Pod, error) {
|
|
ns := pod.Namespace
|
|
framework.Logf("creating pod %s/%s", ns, pod.Name)
|
|
|
|
createdPod, err := k.clientSet.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create pod %s/%s: %w", ns, pod.Name, err)
|
|
}
|
|
return createdPod, nil
|
|
}
|
|
|
|
// cleanNetworkPolicies is a convenience function for deleting network policies before startup of any new test.
|
|
func (k *kubeManager) cleanNetworkPolicies(ctx context.Context) error {
|
|
for _, ns := range k.namespaceNames {
|
|
framework.Logf("deleting policies in %s ..........", ns)
|
|
l, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to list network policies in ns %s: %w", ns, err)
|
|
}
|
|
for _, np := range l.Items {
|
|
framework.Logf("deleting network policy %s/%s", ns, np.Name)
|
|
err = k.clientSet.NetworkingV1().NetworkPolicies(ns).Delete(ctx, np.Name, metav1.DeleteOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("unable to delete network policy %s/%s: %w", ns, np.Name, err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// createNetworkPolicy is a convenience function for creating network policies.
|
|
func (k *kubeManager) createNetworkPolicy(ctx context.Context, ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
|
|
framework.Logf("creating network policy %s/%s", ns, netpol.Name)
|
|
netpol.ObjectMeta.Namespace = ns
|
|
np, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).Create(ctx, netpol, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create network policy %s/%s: %w", ns, netpol.Name, err)
|
|
}
|
|
return np, nil
|
|
}
|
|
|
|
// updateNetworkPolicy is a convenience function for updating network policies.
|
|
func (k *kubeManager) updateNetworkPolicy(ctx context.Context, ns string, netpol *networkingv1.NetworkPolicy) (*networkingv1.NetworkPolicy, error) {
|
|
framework.Logf("updating network policy %s/%s", ns, netpol.Name)
|
|
netpol.ObjectMeta.Namespace = ns
|
|
np, err := k.clientSet.NetworkingV1().NetworkPolicies(ns).Update(ctx, netpol, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return np, fmt.Errorf("unable to update network policy %s/%s: %w", ns, netpol.Name, err)
|
|
}
|
|
return np, nil
|
|
}
|
|
|
|
// getNamespace gets a namespace object from kubernetes.
|
|
func (k *kubeManager) getNamespace(ctx context.Context, ns string) (*v1.Namespace, error) {
|
|
selectedNameSpace, err := k.clientSet.CoreV1().Namespaces().Get(ctx, ns, metav1.GetOptions{})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to get namespace %s: %w", ns, err)
|
|
}
|
|
return selectedNameSpace, nil
|
|
}
|
|
|
|
// getProbeTimeoutSeconds returns a timeout for how long the probe should work before failing a check, and takes windows heuristics into account, where requests can take longer sometimes.
|
|
func getProbeTimeoutSeconds() int {
|
|
timeoutSeconds := 1
|
|
if framework.NodeOSDistroIs("windows") {
|
|
timeoutSeconds = 3
|
|
}
|
|
return timeoutSeconds
|
|
}
|
|
|
|
// getWorkers returns the number of workers suggested to run when testing.
|
|
func getWorkers() int {
|
|
return 3
|
|
}
|
|
|
|
// getPollInterval returns the value for which the Prober will wait before attempting next attempt.
|
|
func getPollIntervalSeconds() int {
|
|
return defaultPollIntervalSeconds
|
|
}
|
|
|
|
// getPollTimeout returns the timeout for polling on probes, and takes windows heuristics into account, where requests can take longer sometimes.
|
|
func getPollTimeoutSeconds() int {
|
|
if framework.NodeOSDistroIs("windows") {
|
|
return defaultPollTimeoutSeconds * 2
|
|
}
|
|
return defaultPollTimeoutSeconds
|
|
}
|