mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			1820 lines
		
	
	
		
			55 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1820 lines
		
	
	
		
			55 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
Copyright 2016 The Kubernetes Authors.
 | 
						|
 | 
						|
Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
you may not use this file except in compliance with the License.
 | 
						|
You may obtain a copy of the License at
 | 
						|
 | 
						|
    http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 | 
						|
Unless required by applicable law or agreed to in writing, software
 | 
						|
distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
See the License for the specific language governing permissions and
 | 
						|
limitations under the License.
 | 
						|
*/
 | 
						|
 | 
						|
package utils
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"math"
 | 
						|
	"os"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	apps "k8s.io/api/apps/v1"
 | 
						|
	batch "k8s.io/api/batch/v1"
 | 
						|
	v1 "k8s.io/api/core/v1"
 | 
						|
	storagev1 "k8s.io/api/storage/v1"
 | 
						|
	storagev1beta1 "k8s.io/api/storage/v1beta1"
 | 
						|
	apiequality "k8s.io/apimachinery/pkg/api/equality"
 | 
						|
	apierrors "k8s.io/apimachinery/pkg/api/errors"
 | 
						|
	"k8s.io/apimachinery/pkg/api/resource"
 | 
						|
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 | 
						|
	"k8s.io/apimachinery/pkg/fields"
 | 
						|
	"k8s.io/apimachinery/pkg/labels"
 | 
						|
	"k8s.io/apimachinery/pkg/runtime/schema"
 | 
						|
	"k8s.io/apimachinery/pkg/types"
 | 
						|
	"k8s.io/apimachinery/pkg/util/json"
 | 
						|
	"k8s.io/apimachinery/pkg/util/sets"
 | 
						|
	"k8s.io/apimachinery/pkg/util/strategicpatch"
 | 
						|
	"k8s.io/apimachinery/pkg/util/uuid"
 | 
						|
	"k8s.io/apimachinery/pkg/util/wait"
 | 
						|
	clientset "k8s.io/client-go/kubernetes"
 | 
						|
	scaleclient "k8s.io/client-go/scale"
 | 
						|
	"k8s.io/client-go/util/workqueue"
 | 
						|
	batchinternal "k8s.io/kubernetes/pkg/apis/batch"
 | 
						|
	api "k8s.io/kubernetes/pkg/apis/core"
 | 
						|
	extensionsinternal "k8s.io/kubernetes/pkg/apis/extensions"
 | 
						|
 | 
						|
	"k8s.io/klog/v2"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// String used to mark pod deletion
 | 
						|
	nonExist = "NonExist"
 | 
						|
)
 | 
						|
 | 
						|
func removePtr(replicas *int32) int32 {
 | 
						|
	if replicas == nil {
 | 
						|
		return 0
 | 
						|
	}
 | 
						|
	return *replicas
 | 
						|
}
 | 
						|
 | 
						|
func WaitUntilPodIsScheduled(c clientset.Interface, name, namespace string, timeout time.Duration) (*v1.Pod, error) {
 | 
						|
	// Wait until it's scheduled
 | 
						|
	p, err := c.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{ResourceVersion: "0"})
 | 
						|
	if err == nil && p.Spec.NodeName != "" {
 | 
						|
		return p, nil
 | 
						|
	}
 | 
						|
	pollingPeriod := 200 * time.Millisecond
 | 
						|
	startTime := time.Now()
 | 
						|
	for startTime.Add(timeout).After(time.Now()) {
 | 
						|
		time.Sleep(pollingPeriod)
 | 
						|
		p, err := c.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{ResourceVersion: "0"})
 | 
						|
		if err == nil && p.Spec.NodeName != "" {
 | 
						|
			return p, nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil, fmt.Errorf("timed out after %v when waiting for pod %v/%v to start", timeout, namespace, name)
 | 
						|
}
 | 
						|
 | 
						|
func RunPodAndGetNodeName(c clientset.Interface, pod *v1.Pod, timeout time.Duration) (string, error) {
 | 
						|
	name := pod.Name
 | 
						|
	namespace := pod.Namespace
 | 
						|
	if err := CreatePodWithRetries(c, namespace, pod); err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	p, err := WaitUntilPodIsScheduled(c, name, namespace, timeout)
 | 
						|
	if err != nil {
 | 
						|
		return "", err
 | 
						|
	}
 | 
						|
	return p.Spec.NodeName, nil
 | 
						|
}
 | 
						|
 | 
						|
type RunObjectConfig interface {
 | 
						|
	Run() error
 | 
						|
	GetName() string
 | 
						|
	GetNamespace() string
 | 
						|
	GetKind() schema.GroupKind
 | 
						|
	GetClient() clientset.Interface
 | 
						|
	GetScalesGetter() scaleclient.ScalesGetter
 | 
						|
	SetClient(clientset.Interface)
 | 
						|
	SetScalesClient(scaleclient.ScalesGetter)
 | 
						|
	GetReplicas() int
 | 
						|
	GetLabelValue(string) (string, bool)
 | 
						|
	GetGroupResource() schema.GroupResource
 | 
						|
	GetGroupVersionResource() schema.GroupVersionResource
 | 
						|
}
 | 
						|
 | 
						|
type RCConfig struct {
 | 
						|
	Affinity                      *v1.Affinity
 | 
						|
	Client                        clientset.Interface
 | 
						|
	ScalesGetter                  scaleclient.ScalesGetter
 | 
						|
	Image                         string
 | 
						|
	Command                       []string
 | 
						|
	Name                          string
 | 
						|
	Namespace                     string
 | 
						|
	PollInterval                  time.Duration
 | 
						|
	Timeout                       time.Duration
 | 
						|
	PodStatusFile                 *os.File
 | 
						|
	Replicas                      int
 | 
						|
	CpuRequest                    int64 // millicores
 | 
						|
	CpuLimit                      int64 // millicores
 | 
						|
	MemRequest                    int64 // bytes
 | 
						|
	MemLimit                      int64 // bytes
 | 
						|
	GpuLimit                      int64 // count
 | 
						|
	ReadinessProbe                *v1.Probe
 | 
						|
	DNSPolicy                     *v1.DNSPolicy
 | 
						|
	PriorityClassName             string
 | 
						|
	TerminationGracePeriodSeconds *int64
 | 
						|
	Lifecycle                     *v1.Lifecycle
 | 
						|
 | 
						|
	// Env vars, set the same for every pod.
 | 
						|
	Env map[string]string
 | 
						|
 | 
						|
	// Extra labels and annotations added to every pod.
 | 
						|
	Labels      map[string]string
 | 
						|
	Annotations map[string]string
 | 
						|
 | 
						|
	// Node selector for pods in the RC.
 | 
						|
	NodeSelector map[string]string
 | 
						|
 | 
						|
	// Tolerations for pods in the RC.
 | 
						|
	Tolerations []v1.Toleration
 | 
						|
 | 
						|
	// Ports to declare in the container (map of name to containerPort).
 | 
						|
	Ports map[string]int
 | 
						|
	// Ports to declare in the container as host and container ports.
 | 
						|
	HostPorts map[string]int
 | 
						|
 | 
						|
	Volumes      []v1.Volume
 | 
						|
	VolumeMounts []v1.VolumeMount
 | 
						|
 | 
						|
	// Pointer to a list of pods; if non-nil, will be set to a list of pods
 | 
						|
	// created by this RC by RunRC.
 | 
						|
	CreatedPods *[]*v1.Pod
 | 
						|
 | 
						|
	// Maximum allowable container failures. If exceeded, RunRC returns an error.
 | 
						|
	// Defaults to replicas*0.1 if unspecified.
 | 
						|
	MaxContainerFailures *int
 | 
						|
	// Maximum allowed pod deletions count. If exceeded, RunRC returns an error.
 | 
						|
	// Defaults to 0.
 | 
						|
	MaxAllowedPodDeletions int
 | 
						|
 | 
						|
	// If set to false starting RC will print progress, otherwise only errors will be printed.
 | 
						|
	Silent bool
 | 
						|
 | 
						|
	// If set this function will be used to print log lines instead of klog.
 | 
						|
	LogFunc func(fmt string, args ...interface{})
 | 
						|
	// If set those functions will be used to gather data from Nodes - in integration tests where no
 | 
						|
	// kubelets are running those variables should be nil.
 | 
						|
	NodeDumpFunc      func(c clientset.Interface, nodeNames []string, logFunc func(fmt string, args ...interface{}))
 | 
						|
	ContainerDumpFunc func(c clientset.Interface, ns string, logFunc func(ftm string, args ...interface{}))
 | 
						|
 | 
						|
	// Names of the secrets and configmaps to mount.
 | 
						|
	SecretNames    []string
 | 
						|
	ConfigMapNames []string
 | 
						|
 | 
						|
	ServiceAccountTokenProjections int
 | 
						|
 | 
						|
	//Additional containers to run in the pod
 | 
						|
	AdditionalContainers []v1.Container
 | 
						|
}
 | 
						|
 | 
						|
func (rc *RCConfig) RCConfigLog(fmt string, args ...interface{}) {
 | 
						|
	if rc.LogFunc != nil {
 | 
						|
		rc.LogFunc(fmt, args...)
 | 
						|
	}
 | 
						|
	klog.Infof(fmt, args...)
 | 
						|
}
 | 
						|
 | 
						|
type DeploymentConfig struct {
 | 
						|
	RCConfig
 | 
						|
}
 | 
						|
 | 
						|
type ReplicaSetConfig struct {
 | 
						|
	RCConfig
 | 
						|
}
 | 
						|
 | 
						|
type JobConfig struct {
 | 
						|
	RCConfig
 | 
						|
}
 | 
						|
 | 
						|
// podInfo contains pod information useful for debugging e2e tests.
 | 
						|
type podInfo struct {
 | 
						|
	oldHostname string
 | 
						|
	oldPhase    string
 | 
						|
	hostname    string
 | 
						|
	phase       string
 | 
						|
}
 | 
						|
 | 
						|
// PodDiff is a map of pod name to podInfos
 | 
						|
type PodDiff map[string]*podInfo
 | 
						|
 | 
						|
// Print formats and prints the give PodDiff.
 | 
						|
func (p PodDiff) String(ignorePhases sets.String) string {
 | 
						|
	ret := ""
 | 
						|
	for name, info := range p {
 | 
						|
		if ignorePhases.Has(info.phase) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if info.phase == nonExist {
 | 
						|
			ret += fmt.Sprintf("Pod %v was deleted, had phase %v and host %v\n", name, info.oldPhase, info.oldHostname)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		phaseChange, hostChange := false, false
 | 
						|
		msg := fmt.Sprintf("Pod %v ", name)
 | 
						|
		if info.oldPhase != info.phase {
 | 
						|
			phaseChange = true
 | 
						|
			if info.oldPhase == nonExist {
 | 
						|
				msg += fmt.Sprintf("in phase %v ", info.phase)
 | 
						|
			} else {
 | 
						|
				msg += fmt.Sprintf("went from phase: %v -> %v ", info.oldPhase, info.phase)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if info.oldHostname != info.hostname {
 | 
						|
			hostChange = true
 | 
						|
			if info.oldHostname == nonExist || info.oldHostname == "" {
 | 
						|
				msg += fmt.Sprintf("assigned host %v ", info.hostname)
 | 
						|
			} else {
 | 
						|
				msg += fmt.Sprintf("went from host: %v -> %v ", info.oldHostname, info.hostname)
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if phaseChange || hostChange {
 | 
						|
			ret += msg + "\n"
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return ret
 | 
						|
}
 | 
						|
 | 
						|
// DeletedPods returns a slice of pods that were present at the beginning
 | 
						|
// and then disappeared.
 | 
						|
func (p PodDiff) DeletedPods() []string {
 | 
						|
	var deletedPods []string
 | 
						|
	for podName, podInfo := range p {
 | 
						|
		if podInfo.hostname == nonExist {
 | 
						|
			deletedPods = append(deletedPods, podName)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return deletedPods
 | 
						|
}
 | 
						|
 | 
						|
// Diff computes a PodDiff given 2 lists of pods.
 | 
						|
func Diff(oldPods []*v1.Pod, curPods []*v1.Pod) PodDiff {
 | 
						|
	podInfoMap := PodDiff{}
 | 
						|
 | 
						|
	// New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
 | 
						|
	for _, pod := range curPods {
 | 
						|
		podInfoMap[pod.Name] = &podInfo{hostname: pod.Spec.NodeName, phase: string(pod.Status.Phase), oldHostname: nonExist, oldPhase: nonExist}
 | 
						|
	}
 | 
						|
 | 
						|
	// Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
 | 
						|
	for _, pod := range oldPods {
 | 
						|
		if info, ok := podInfoMap[pod.Name]; ok {
 | 
						|
			info.oldHostname, info.oldPhase = pod.Spec.NodeName, string(pod.Status.Phase)
 | 
						|
		} else {
 | 
						|
			podInfoMap[pod.Name] = &podInfo{hostname: nonExist, phase: nonExist, oldHostname: pod.Spec.NodeName, oldPhase: string(pod.Status.Phase)}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return podInfoMap
 | 
						|
}
 | 
						|
 | 
						|
// RunDeployment Launches (and verifies correctness) of a Deployment
 | 
						|
// and will wait for all pods it spawns to become "Running".
 | 
						|
// It's the caller's responsibility to clean up externally (i.e. use the
 | 
						|
// namespace lifecycle for handling Cleanup).
 | 
						|
func RunDeployment(config DeploymentConfig) error {
 | 
						|
	err := config.create()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return config.start()
 | 
						|
}
 | 
						|
 | 
						|
func (config *DeploymentConfig) Run() error {
 | 
						|
	return RunDeployment(*config)
 | 
						|
}
 | 
						|
 | 
						|
func (config *DeploymentConfig) GetKind() schema.GroupKind {
 | 
						|
	return extensionsinternal.Kind("Deployment")
 | 
						|
}
 | 
						|
 | 
						|
func (config *DeploymentConfig) GetGroupResource() schema.GroupResource {
 | 
						|
	return extensionsinternal.Resource("deployments")
 | 
						|
}
 | 
						|
 | 
						|
func (config *DeploymentConfig) GetGroupVersionResource() schema.GroupVersionResource {
 | 
						|
	return extensionsinternal.SchemeGroupVersion.WithResource("deployments")
 | 
						|
}
 | 
						|
 | 
						|
func (config *DeploymentConfig) create() error {
 | 
						|
	deployment := &apps.Deployment{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: config.Name,
 | 
						|
		},
 | 
						|
		Spec: apps.DeploymentSpec{
 | 
						|
			Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
 | 
						|
			Selector: &metav1.LabelSelector{
 | 
						|
				MatchLabels: map[string]string{
 | 
						|
					"name": config.Name,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			Template: v1.PodTemplateSpec{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{
 | 
						|
					Labels:      map[string]string{"name": config.Name},
 | 
						|
					Annotations: config.Annotations,
 | 
						|
				},
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					Affinity:                      config.Affinity,
 | 
						|
					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
 | 
						|
					Containers: []v1.Container{
 | 
						|
						{
 | 
						|
							Name:      config.Name,
 | 
						|
							Image:     config.Image,
 | 
						|
							Command:   config.Command,
 | 
						|
							Ports:     []v1.ContainerPort{{ContainerPort: 80}},
 | 
						|
							Lifecycle: config.Lifecycle,
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	if len(config.AdditionalContainers) > 0 {
 | 
						|
		deployment.Spec.Template.Spec.Containers = append(deployment.Spec.Template.Spec.Containers, config.AdditionalContainers...)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(config.SecretNames) > 0 {
 | 
						|
		attachSecrets(&deployment.Spec.Template, config.SecretNames)
 | 
						|
	}
 | 
						|
	if len(config.ConfigMapNames) > 0 {
 | 
						|
		attachConfigMaps(&deployment.Spec.Template, config.ConfigMapNames)
 | 
						|
	}
 | 
						|
 | 
						|
	for i := 0; i < config.ServiceAccountTokenProjections; i++ {
 | 
						|
		attachServiceAccountTokenProjection(&deployment.Spec.Template, fmt.Sprintf("tok-%d", i))
 | 
						|
	}
 | 
						|
 | 
						|
	config.applyTo(&deployment.Spec.Template)
 | 
						|
 | 
						|
	if err := CreateDeploymentWithRetries(config.Client, config.Namespace, deployment); err != nil {
 | 
						|
		return fmt.Errorf("error creating deployment: %v", err)
 | 
						|
	}
 | 
						|
	config.RCConfigLog("Created deployment with name: %v, namespace: %v, replica count: %v", deployment.Name, config.Namespace, removePtr(deployment.Spec.Replicas))
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// RunReplicaSet launches (and verifies correctness) of a ReplicaSet
 | 
						|
// and waits until all the pods it launches to reach the "Running" state.
 | 
						|
// It's the caller's responsibility to clean up externally (i.e. use the
 | 
						|
// namespace lifecycle for handling Cleanup).
 | 
						|
func RunReplicaSet(config ReplicaSetConfig) error {
 | 
						|
	err := config.create()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return config.start()
 | 
						|
}
 | 
						|
 | 
						|
func (config *ReplicaSetConfig) Run() error {
 | 
						|
	return RunReplicaSet(*config)
 | 
						|
}
 | 
						|
 | 
						|
func (config *ReplicaSetConfig) GetKind() schema.GroupKind {
 | 
						|
	return extensionsinternal.Kind("ReplicaSet")
 | 
						|
}
 | 
						|
 | 
						|
func (config *ReplicaSetConfig) GetGroupResource() schema.GroupResource {
 | 
						|
	return extensionsinternal.Resource("replicasets")
 | 
						|
}
 | 
						|
 | 
						|
func (config *ReplicaSetConfig) GetGroupVersionResource() schema.GroupVersionResource {
 | 
						|
	return extensionsinternal.SchemeGroupVersion.WithResource("replicasets")
 | 
						|
}
 | 
						|
 | 
						|
func (config *ReplicaSetConfig) create() error {
 | 
						|
	rs := &apps.ReplicaSet{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: config.Name,
 | 
						|
		},
 | 
						|
		Spec: apps.ReplicaSetSpec{
 | 
						|
			Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
 | 
						|
			Selector: &metav1.LabelSelector{
 | 
						|
				MatchLabels: map[string]string{
 | 
						|
					"name": config.Name,
 | 
						|
				},
 | 
						|
			},
 | 
						|
			Template: v1.PodTemplateSpec{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{
 | 
						|
					Labels:      map[string]string{"name": config.Name},
 | 
						|
					Annotations: config.Annotations,
 | 
						|
				},
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					Affinity:                      config.Affinity,
 | 
						|
					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
 | 
						|
					Containers: []v1.Container{
 | 
						|
						{
 | 
						|
							Name:      config.Name,
 | 
						|
							Image:     config.Image,
 | 
						|
							Command:   config.Command,
 | 
						|
							Ports:     []v1.ContainerPort{{ContainerPort: 80}},
 | 
						|
							Lifecycle: config.Lifecycle,
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	if len(config.AdditionalContainers) > 0 {
 | 
						|
		rs.Spec.Template.Spec.Containers = append(rs.Spec.Template.Spec.Containers, config.AdditionalContainers...)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(config.SecretNames) > 0 {
 | 
						|
		attachSecrets(&rs.Spec.Template, config.SecretNames)
 | 
						|
	}
 | 
						|
	if len(config.ConfigMapNames) > 0 {
 | 
						|
		attachConfigMaps(&rs.Spec.Template, config.ConfigMapNames)
 | 
						|
	}
 | 
						|
 | 
						|
	config.applyTo(&rs.Spec.Template)
 | 
						|
 | 
						|
	if err := CreateReplicaSetWithRetries(config.Client, config.Namespace, rs); err != nil {
 | 
						|
		return fmt.Errorf("error creating replica set: %v", err)
 | 
						|
	}
 | 
						|
	config.RCConfigLog("Created replica set with name: %v, namespace: %v, replica count: %v", rs.Name, config.Namespace, removePtr(rs.Spec.Replicas))
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// RunJob baunches (and verifies correctness) of a Job
 | 
						|
// and will wait for all pods it spawns to become "Running".
 | 
						|
// It's the caller's responsibility to clean up externally (i.e. use the
 | 
						|
// namespace lifecycle for handling Cleanup).
 | 
						|
func RunJob(config JobConfig) error {
 | 
						|
	err := config.create()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return config.start()
 | 
						|
}
 | 
						|
 | 
						|
func (config *JobConfig) Run() error {
 | 
						|
	return RunJob(*config)
 | 
						|
}
 | 
						|
 | 
						|
func (config *JobConfig) GetKind() schema.GroupKind {
 | 
						|
	return batchinternal.Kind("Job")
 | 
						|
}
 | 
						|
 | 
						|
func (config *JobConfig) GetGroupResource() schema.GroupResource {
 | 
						|
	return batchinternal.Resource("jobs")
 | 
						|
}
 | 
						|
 | 
						|
func (config *JobConfig) GetGroupVersionResource() schema.GroupVersionResource {
 | 
						|
	return batchinternal.SchemeGroupVersion.WithResource("jobs")
 | 
						|
}
 | 
						|
 | 
						|
func (config *JobConfig) create() error {
 | 
						|
	job := &batch.Job{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: config.Name,
 | 
						|
		},
 | 
						|
		Spec: batch.JobSpec{
 | 
						|
			Parallelism: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
 | 
						|
			Completions: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
 | 
						|
			Template: v1.PodTemplateSpec{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{
 | 
						|
					Labels:      map[string]string{"name": config.Name},
 | 
						|
					Annotations: config.Annotations,
 | 
						|
				},
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					Affinity:                      config.Affinity,
 | 
						|
					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(nil),
 | 
						|
					Containers: []v1.Container{
 | 
						|
						{
 | 
						|
							Name:      config.Name,
 | 
						|
							Image:     config.Image,
 | 
						|
							Command:   config.Command,
 | 
						|
							Lifecycle: config.Lifecycle,
 | 
						|
						},
 | 
						|
					},
 | 
						|
					RestartPolicy: v1.RestartPolicyOnFailure,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	if len(config.SecretNames) > 0 {
 | 
						|
		attachSecrets(&job.Spec.Template, config.SecretNames)
 | 
						|
	}
 | 
						|
	if len(config.ConfigMapNames) > 0 {
 | 
						|
		attachConfigMaps(&job.Spec.Template, config.ConfigMapNames)
 | 
						|
	}
 | 
						|
 | 
						|
	config.applyTo(&job.Spec.Template)
 | 
						|
 | 
						|
	if err := CreateJobWithRetries(config.Client, config.Namespace, job); err != nil {
 | 
						|
		return fmt.Errorf("error creating job: %v", err)
 | 
						|
	}
 | 
						|
	config.RCConfigLog("Created job with name: %v, namespace: %v, parallelism/completions: %v", job.Name, config.Namespace, job.Spec.Parallelism)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// RunRC Launches (and verifies correctness) of a Replication Controller
 | 
						|
// and will wait for all pods it spawns to become "Running".
 | 
						|
// It's the caller's responsibility to clean up externally (i.e. use the
 | 
						|
// namespace lifecycle for handling Cleanup).
 | 
						|
func RunRC(config RCConfig) error {
 | 
						|
	err := config.create()
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return config.start()
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) Run() error {
 | 
						|
	return RunRC(*config)
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetName() string {
 | 
						|
	return config.Name
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetNamespace() string {
 | 
						|
	return config.Namespace
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetKind() schema.GroupKind {
 | 
						|
	return api.Kind("ReplicationController")
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetGroupResource() schema.GroupResource {
 | 
						|
	return api.Resource("replicationcontrollers")
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetGroupVersionResource() schema.GroupVersionResource {
 | 
						|
	return api.SchemeGroupVersion.WithResource("replicationcontrollers")
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetClient() clientset.Interface {
 | 
						|
	return config.Client
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetScalesGetter() scaleclient.ScalesGetter {
 | 
						|
	return config.ScalesGetter
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) SetClient(c clientset.Interface) {
 | 
						|
	config.Client = c
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) SetScalesClient(getter scaleclient.ScalesGetter) {
 | 
						|
	config.ScalesGetter = getter
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetReplicas() int {
 | 
						|
	return config.Replicas
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) GetLabelValue(key string) (string, bool) {
 | 
						|
	value, found := config.Labels[key]
 | 
						|
	return value, found
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) create() error {
 | 
						|
	dnsDefault := v1.DNSDefault
 | 
						|
	if config.DNSPolicy == nil {
 | 
						|
		config.DNSPolicy = &dnsDefault
 | 
						|
	}
 | 
						|
	one := int64(1)
 | 
						|
	rc := &v1.ReplicationController{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: config.Name,
 | 
						|
		},
 | 
						|
		Spec: v1.ReplicationControllerSpec{
 | 
						|
			Replicas: func(i int) *int32 { x := int32(i); return &x }(config.Replicas),
 | 
						|
			Selector: map[string]string{
 | 
						|
				"name": config.Name,
 | 
						|
			},
 | 
						|
			Template: &v1.PodTemplateSpec{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{
 | 
						|
					Labels:      map[string]string{"name": config.Name},
 | 
						|
					Annotations: config.Annotations,
 | 
						|
				},
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					Affinity: config.Affinity,
 | 
						|
					Containers: []v1.Container{
 | 
						|
						{
 | 
						|
							Name:           config.Name,
 | 
						|
							Image:          config.Image,
 | 
						|
							Command:        config.Command,
 | 
						|
							Ports:          []v1.ContainerPort{{ContainerPort: 80}},
 | 
						|
							ReadinessProbe: config.ReadinessProbe,
 | 
						|
							Lifecycle:      config.Lifecycle,
 | 
						|
						},
 | 
						|
					},
 | 
						|
					DNSPolicy:                     *config.DNSPolicy,
 | 
						|
					NodeSelector:                  config.NodeSelector,
 | 
						|
					Tolerations:                   config.Tolerations,
 | 
						|
					TerminationGracePeriodSeconds: config.getTerminationGracePeriodSeconds(&one),
 | 
						|
					PriorityClassName:             config.PriorityClassName,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	if len(config.AdditionalContainers) > 0 {
 | 
						|
		rc.Spec.Template.Spec.Containers = append(rc.Spec.Template.Spec.Containers, config.AdditionalContainers...)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(config.SecretNames) > 0 {
 | 
						|
		attachSecrets(rc.Spec.Template, config.SecretNames)
 | 
						|
	}
 | 
						|
	if len(config.ConfigMapNames) > 0 {
 | 
						|
		attachConfigMaps(rc.Spec.Template, config.ConfigMapNames)
 | 
						|
	}
 | 
						|
 | 
						|
	config.applyTo(rc.Spec.Template)
 | 
						|
 | 
						|
	if err := CreateRCWithRetries(config.Client, config.Namespace, rc); err != nil {
 | 
						|
		return fmt.Errorf("error creating replication controller: %v", err)
 | 
						|
	}
 | 
						|
	config.RCConfigLog("Created replication controller with name: %v, namespace: %v, replica count: %v", rc.Name, config.Namespace, removePtr(rc.Spec.Replicas))
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) applyTo(template *v1.PodTemplateSpec) {
 | 
						|
	for k, v := range config.Env {
 | 
						|
		c := &template.Spec.Containers[0]
 | 
						|
		c.Env = append(c.Env, v1.EnvVar{Name: k, Value: v})
 | 
						|
	}
 | 
						|
	for k, v := range config.Labels {
 | 
						|
		template.ObjectMeta.Labels[k] = v
 | 
						|
	}
 | 
						|
	template.Spec.NodeSelector = make(map[string]string)
 | 
						|
	for k, v := range config.NodeSelector {
 | 
						|
		template.Spec.NodeSelector[k] = v
 | 
						|
	}
 | 
						|
	if config.Tolerations != nil {
 | 
						|
		template.Spec.Tolerations = append([]v1.Toleration{}, config.Tolerations...)
 | 
						|
	}
 | 
						|
	for k, v := range config.Ports {
 | 
						|
		c := &template.Spec.Containers[0]
 | 
						|
		c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v)})
 | 
						|
	}
 | 
						|
	for k, v := range config.HostPorts {
 | 
						|
		c := &template.Spec.Containers[0]
 | 
						|
		c.Ports = append(c.Ports, v1.ContainerPort{Name: k, ContainerPort: int32(v), HostPort: int32(v)})
 | 
						|
	}
 | 
						|
	if config.CpuLimit > 0 || config.MemLimit > 0 || config.GpuLimit > 0 {
 | 
						|
		template.Spec.Containers[0].Resources.Limits = v1.ResourceList{}
 | 
						|
	}
 | 
						|
	if config.CpuLimit > 0 {
 | 
						|
		template.Spec.Containers[0].Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuLimit, resource.DecimalSI)
 | 
						|
	}
 | 
						|
	if config.MemLimit > 0 {
 | 
						|
		template.Spec.Containers[0].Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(config.MemLimit, resource.DecimalSI)
 | 
						|
	}
 | 
						|
	if config.CpuRequest > 0 || config.MemRequest > 0 {
 | 
						|
		template.Spec.Containers[0].Resources.Requests = v1.ResourceList{}
 | 
						|
	}
 | 
						|
	if config.CpuRequest > 0 {
 | 
						|
		template.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(config.CpuRequest, resource.DecimalSI)
 | 
						|
	}
 | 
						|
	if config.MemRequest > 0 {
 | 
						|
		template.Spec.Containers[0].Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(config.MemRequest, resource.DecimalSI)
 | 
						|
	}
 | 
						|
	if config.GpuLimit > 0 {
 | 
						|
		template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"] = *resource.NewQuantity(config.GpuLimit, resource.DecimalSI)
 | 
						|
	}
 | 
						|
	if config.Lifecycle != nil {
 | 
						|
		template.Spec.Containers[0].Lifecycle = config.Lifecycle
 | 
						|
	}
 | 
						|
	if len(config.Volumes) > 0 {
 | 
						|
		template.Spec.Volumes = config.Volumes
 | 
						|
	}
 | 
						|
	if len(config.VolumeMounts) > 0 {
 | 
						|
		template.Spec.Containers[0].VolumeMounts = config.VolumeMounts
 | 
						|
	}
 | 
						|
	if config.PriorityClassName != "" {
 | 
						|
		template.Spec.PriorityClassName = config.PriorityClassName
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type RCStartupStatus struct {
 | 
						|
	Expected              int
 | 
						|
	Terminating           int
 | 
						|
	Running               int
 | 
						|
	RunningButNotReady    int
 | 
						|
	Waiting               int
 | 
						|
	Pending               int
 | 
						|
	Scheduled             int
 | 
						|
	Unknown               int
 | 
						|
	Inactive              int
 | 
						|
	FailedContainers      int
 | 
						|
	Created               []*v1.Pod
 | 
						|
	ContainerRestartNodes sets.String
 | 
						|
}
 | 
						|
 | 
						|
func (s *RCStartupStatus) String(name string) string {
 | 
						|
	return fmt.Sprintf("%v Pods: %d out of %d created, %d running, %d pending, %d waiting, %d inactive, %d terminating, %d unknown, %d runningButNotReady ",
 | 
						|
		name, len(s.Created), s.Expected, s.Running, s.Pending, s.Waiting, s.Inactive, s.Terminating, s.Unknown, s.RunningButNotReady)
 | 
						|
}
 | 
						|
 | 
						|
func ComputeRCStartupStatus(pods []*v1.Pod, expected int) RCStartupStatus {
 | 
						|
	startupStatus := RCStartupStatus{
 | 
						|
		Expected:              expected,
 | 
						|
		Created:               make([]*v1.Pod, 0, expected),
 | 
						|
		ContainerRestartNodes: sets.NewString(),
 | 
						|
	}
 | 
						|
	for _, p := range pods {
 | 
						|
		if p.DeletionTimestamp != nil {
 | 
						|
			startupStatus.Terminating++
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		startupStatus.Created = append(startupStatus.Created, p)
 | 
						|
		if p.Status.Phase == v1.PodRunning {
 | 
						|
			ready := false
 | 
						|
			for _, c := range p.Status.Conditions {
 | 
						|
				if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
 | 
						|
					ready = true
 | 
						|
					break
 | 
						|
				}
 | 
						|
			}
 | 
						|
			if ready {
 | 
						|
				// Only count a pod is running when it is also ready.
 | 
						|
				startupStatus.Running++
 | 
						|
			} else {
 | 
						|
				startupStatus.RunningButNotReady++
 | 
						|
			}
 | 
						|
			for _, v := range FailedContainers(p) {
 | 
						|
				startupStatus.FailedContainers = startupStatus.FailedContainers + v.Restarts
 | 
						|
				startupStatus.ContainerRestartNodes.Insert(p.Spec.NodeName)
 | 
						|
			}
 | 
						|
		} else if p.Status.Phase == v1.PodPending {
 | 
						|
			if p.Spec.NodeName == "" {
 | 
						|
				startupStatus.Waiting++
 | 
						|
			} else {
 | 
						|
				startupStatus.Pending++
 | 
						|
			}
 | 
						|
		} else if p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed {
 | 
						|
			startupStatus.Inactive++
 | 
						|
		} else if p.Status.Phase == v1.PodUnknown {
 | 
						|
			startupStatus.Unknown++
 | 
						|
		}
 | 
						|
		// Record count of scheduled pods (useful for computing scheduler throughput).
 | 
						|
		if p.Spec.NodeName != "" {
 | 
						|
			startupStatus.Scheduled++
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return startupStatus
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) start() error {
 | 
						|
	// Don't force tests to fail if they don't care about containers restarting.
 | 
						|
	var maxContainerFailures int
 | 
						|
	if config.MaxContainerFailures == nil {
 | 
						|
		maxContainerFailures = int(math.Max(1.0, float64(config.Replicas)*.01))
 | 
						|
	} else {
 | 
						|
		maxContainerFailures = *config.MaxContainerFailures
 | 
						|
	}
 | 
						|
 | 
						|
	label := labels.SelectorFromSet(labels.Set(map[string]string{"name": config.Name}))
 | 
						|
 | 
						|
	ps, err := NewPodStore(config.Client, config.Namespace, label, fields.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer ps.Stop()
 | 
						|
 | 
						|
	interval := config.PollInterval
 | 
						|
	if interval <= 0 {
 | 
						|
		interval = 10 * time.Second
 | 
						|
	}
 | 
						|
	timeout := config.Timeout
 | 
						|
	if timeout <= 0 {
 | 
						|
		timeout = 5 * time.Minute
 | 
						|
	}
 | 
						|
	oldPods := make([]*v1.Pod, 0)
 | 
						|
	oldRunning := 0
 | 
						|
	lastChange := time.Now()
 | 
						|
	podDeletionsCount := 0
 | 
						|
	for oldRunning != config.Replicas {
 | 
						|
		time.Sleep(interval)
 | 
						|
 | 
						|
		pods := ps.List()
 | 
						|
		startupStatus := ComputeRCStartupStatus(pods, config.Replicas)
 | 
						|
 | 
						|
		if config.CreatedPods != nil {
 | 
						|
			*config.CreatedPods = startupStatus.Created
 | 
						|
		}
 | 
						|
		if !config.Silent {
 | 
						|
			config.RCConfigLog(startupStatus.String(config.Name))
 | 
						|
		}
 | 
						|
 | 
						|
		if config.PodStatusFile != nil {
 | 
						|
			fmt.Fprintf(config.PodStatusFile, "%d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown, %d, runningButNotReady\n", startupStatus.Running, startupStatus.Pending, startupStatus.Waiting, startupStatus.Inactive, startupStatus.Unknown, startupStatus.RunningButNotReady)
 | 
						|
		}
 | 
						|
 | 
						|
		if startupStatus.FailedContainers > maxContainerFailures {
 | 
						|
			if config.NodeDumpFunc != nil {
 | 
						|
				config.NodeDumpFunc(config.Client, startupStatus.ContainerRestartNodes.List(), config.RCConfigLog)
 | 
						|
			}
 | 
						|
			if config.ContainerDumpFunc != nil {
 | 
						|
				// Get the logs from the failed containers to help diagnose what caused them to fail
 | 
						|
				config.ContainerDumpFunc(config.Client, config.Namespace, config.RCConfigLog)
 | 
						|
			}
 | 
						|
			return fmt.Errorf("%d containers failed which is more than allowed %d", startupStatus.FailedContainers, maxContainerFailures)
 | 
						|
		}
 | 
						|
 | 
						|
		diff := Diff(oldPods, pods)
 | 
						|
		deletedPods := diff.DeletedPods()
 | 
						|
		podDeletionsCount += len(deletedPods)
 | 
						|
		if podDeletionsCount > config.MaxAllowedPodDeletions {
 | 
						|
			// Number of pods which disappeared is over threshold
 | 
						|
			err := fmt.Errorf("%d pods disappeared for %s: %v", podDeletionsCount, config.Name, strings.Join(deletedPods, ", "))
 | 
						|
			config.RCConfigLog(err.Error())
 | 
						|
			config.RCConfigLog(diff.String(sets.NewString()))
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		if len(pods) > len(oldPods) || startupStatus.Running > oldRunning {
 | 
						|
			lastChange = time.Now()
 | 
						|
		}
 | 
						|
		oldPods = pods
 | 
						|
		oldRunning = startupStatus.Running
 | 
						|
 | 
						|
		if time.Since(lastChange) > timeout {
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if oldRunning != config.Replicas {
 | 
						|
		// List only pods from a given replication controller.
 | 
						|
		options := metav1.ListOptions{LabelSelector: label.String()}
 | 
						|
		if pods, err := config.Client.CoreV1().Pods(config.Namespace).List(context.TODO(), options); err == nil {
 | 
						|
			for _, pod := range pods.Items {
 | 
						|
				config.RCConfigLog("Pod %s\t%s\t%s\t%s", pod.Name, pod.Spec.NodeName, pod.Status.Phase, pod.DeletionTimestamp)
 | 
						|
			}
 | 
						|
		} else {
 | 
						|
			config.RCConfigLog("Can't list pod debug info: %v", err)
 | 
						|
		}
 | 
						|
		return fmt.Errorf("only %d pods started out of %d", oldRunning, config.Replicas)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Simplified version of RunRC, that does not create RC, but creates plain Pods.
 | 
						|
// Optionally waits for pods to start running (if waitForRunning == true).
 | 
						|
// The number of replicas must be non-zero.
 | 
						|
func StartPods(c clientset.Interface, replicas int, namespace string, podNamePrefix string,
 | 
						|
	pod v1.Pod, waitForRunning bool, logFunc func(fmt string, args ...interface{})) error {
 | 
						|
	// no pod to start
 | 
						|
	if replicas < 1 {
 | 
						|
		panic("StartPods: number of replicas must be non-zero")
 | 
						|
	}
 | 
						|
	startPodsID := string(uuid.NewUUID()) // So that we can label and find them
 | 
						|
	for i := 0; i < replicas; i++ {
 | 
						|
		podName := fmt.Sprintf("%v-%v", podNamePrefix, i)
 | 
						|
		pod.ObjectMeta.Name = podName
 | 
						|
		pod.ObjectMeta.Labels["name"] = podName
 | 
						|
		pod.ObjectMeta.Labels["startPodsID"] = startPodsID
 | 
						|
		pod.Spec.Containers[0].Name = podName
 | 
						|
		if err := CreatePodWithRetries(c, namespace, &pod); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
	}
 | 
						|
	logFunc("Waiting for running...")
 | 
						|
	if waitForRunning {
 | 
						|
		label := labels.SelectorFromSet(labels.Set(map[string]string{"startPodsID": startPodsID}))
 | 
						|
		err := WaitForPodsWithLabelRunning(c, namespace, label)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("error waiting for %d pods to be running - probably a timeout: %v", replicas, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Wait up to 10 minutes for all matching pods to become Running and at least one
 | 
						|
// matching pod exists.
 | 
						|
func WaitForPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector) error {
 | 
						|
	return WaitForEnoughPodsWithLabelRunning(c, ns, label, -1)
 | 
						|
}
 | 
						|
 | 
						|
// Wait up to 10 minutes for at least 'replicas' many pods to be Running and at least
 | 
						|
// one matching pod exists. If 'replicas' is < 0, wait for all matching pods running.
 | 
						|
func WaitForEnoughPodsWithLabelRunning(c clientset.Interface, ns string, label labels.Selector, replicas int) error {
 | 
						|
	running := false
 | 
						|
	ps, err := NewPodStore(c, ns, label, fields.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer ps.Stop()
 | 
						|
 | 
						|
	for start := time.Now(); time.Since(start) < 10*time.Minute; time.Sleep(5 * time.Second) {
 | 
						|
		pods := ps.List()
 | 
						|
		if len(pods) == 0 {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		runningPodsCount := 0
 | 
						|
		for _, p := range pods {
 | 
						|
			if p.Status.Phase == v1.PodRunning {
 | 
						|
				runningPodsCount++
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if (replicas < 0 && runningPodsCount < len(pods)) || (runningPodsCount < replicas) {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		running = true
 | 
						|
		break
 | 
						|
	}
 | 
						|
	if !running {
 | 
						|
		return fmt.Errorf("timeout while waiting for pods with labels %q to be running", label.String())
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type CountToStrategy struct {
 | 
						|
	Count    int
 | 
						|
	Strategy PrepareNodeStrategy
 | 
						|
}
 | 
						|
 | 
						|
type TestNodePreparer interface {
 | 
						|
	PrepareNodes(nextNodeIndex int) error
 | 
						|
	CleanupNodes() error
 | 
						|
}
 | 
						|
 | 
						|
type PrepareNodeStrategy interface {
 | 
						|
	// Modify pre-created Node objects before the test starts.
 | 
						|
	PreparePatch(node *v1.Node) []byte
 | 
						|
	// Create or modify any objects that depend on the node before the test starts.
 | 
						|
	// Caller will re-try when http.StatusConflict error is returned.
 | 
						|
	PrepareDependentObjects(node *v1.Node, client clientset.Interface) error
 | 
						|
	// Clean up any node modifications after the test finishes.
 | 
						|
	CleanupNode(node *v1.Node) *v1.Node
 | 
						|
	// Clean up any objects that depend on the node after the test finishes.
 | 
						|
	// Caller will re-try when http.StatusConflict error is returned.
 | 
						|
	CleanupDependentObjects(nodeName string, client clientset.Interface) error
 | 
						|
}
 | 
						|
 | 
						|
type TrivialNodePrepareStrategy struct{}
 | 
						|
 | 
						|
var _ PrepareNodeStrategy = &TrivialNodePrepareStrategy{}
 | 
						|
 | 
						|
func (*TrivialNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
 | 
						|
	return []byte{}
 | 
						|
}
 | 
						|
 | 
						|
func (*TrivialNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
 | 
						|
	nodeCopy := *node
 | 
						|
	return &nodeCopy
 | 
						|
}
 | 
						|
 | 
						|
func (*TrivialNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (*TrivialNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type LabelNodePrepareStrategy struct {
 | 
						|
	LabelKey      string
 | 
						|
	LabelValues   []string
 | 
						|
	roundRobinIdx int
 | 
						|
}
 | 
						|
 | 
						|
var _ PrepareNodeStrategy = &LabelNodePrepareStrategy{}
 | 
						|
 | 
						|
func NewLabelNodePrepareStrategy(labelKey string, labelValues ...string) *LabelNodePrepareStrategy {
 | 
						|
	return &LabelNodePrepareStrategy{
 | 
						|
		LabelKey:    labelKey,
 | 
						|
		LabelValues: labelValues,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *LabelNodePrepareStrategy) PreparePatch(*v1.Node) []byte {
 | 
						|
	labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, s.LabelValues[s.roundRobinIdx])
 | 
						|
	patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
 | 
						|
	s.roundRobinIdx++
 | 
						|
	if s.roundRobinIdx == len(s.LabelValues) {
 | 
						|
		s.roundRobinIdx = 0
 | 
						|
	}
 | 
						|
	return []byte(patch)
 | 
						|
}
 | 
						|
 | 
						|
func (s *LabelNodePrepareStrategy) CleanupNode(node *v1.Node) *v1.Node {
 | 
						|
	nodeCopy := node.DeepCopy()
 | 
						|
	if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
 | 
						|
		delete(nodeCopy.Labels, s.LabelKey)
 | 
						|
	}
 | 
						|
	return nodeCopy
 | 
						|
}
 | 
						|
 | 
						|
func (*LabelNodePrepareStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (*LabelNodePrepareStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// NodeAllocatableStrategy fills node.status.allocatable and csiNode.spec.drivers[*].allocatable.
 | 
						|
// csiNode is created if it does not exist. On cleanup, any csiNode.spec.drivers[*].allocatable is
 | 
						|
// set to nil.
 | 
						|
type NodeAllocatableStrategy struct {
 | 
						|
	// Node.status.allocatable to fill to all nodes.
 | 
						|
	NodeAllocatable map[v1.ResourceName]string
 | 
						|
	// Map <driver_name> -> VolumeNodeResources to fill into csiNode.spec.drivers[<driver_name>].
 | 
						|
	CsiNodeAllocatable map[string]*storagev1.VolumeNodeResources
 | 
						|
	// List of in-tree volume plugins migrated to CSI.
 | 
						|
	MigratedPlugins []string
 | 
						|
}
 | 
						|
 | 
						|
var _ PrepareNodeStrategy = &NodeAllocatableStrategy{}
 | 
						|
 | 
						|
func NewNodeAllocatableStrategy(nodeAllocatable map[v1.ResourceName]string, csiNodeAllocatable map[string]*storagev1.VolumeNodeResources, migratedPlugins []string) *NodeAllocatableStrategy {
 | 
						|
	return &NodeAllocatableStrategy{
 | 
						|
		NodeAllocatable:    nodeAllocatable,
 | 
						|
		CsiNodeAllocatable: csiNodeAllocatable,
 | 
						|
		MigratedPlugins:    migratedPlugins,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *NodeAllocatableStrategy) PreparePatch(node *v1.Node) []byte {
 | 
						|
	newNode := node.DeepCopy()
 | 
						|
	for name, value := range s.NodeAllocatable {
 | 
						|
		newNode.Status.Allocatable[name] = resource.MustParse(value)
 | 
						|
	}
 | 
						|
 | 
						|
	oldJSON, err := json.Marshal(node)
 | 
						|
	if err != nil {
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
	newJSON, err := json.Marshal(newNode)
 | 
						|
	if err != nil {
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
 | 
						|
	patch, err := strategicpatch.CreateTwoWayMergePatch(oldJSON, newJSON, v1.Node{})
 | 
						|
	if err != nil {
 | 
						|
		panic(err)
 | 
						|
	}
 | 
						|
	return patch
 | 
						|
}
 | 
						|
 | 
						|
func (s *NodeAllocatableStrategy) CleanupNode(node *v1.Node) *v1.Node {
 | 
						|
	nodeCopy := node.DeepCopy()
 | 
						|
	for name := range s.NodeAllocatable {
 | 
						|
		delete(nodeCopy.Status.Allocatable, name)
 | 
						|
	}
 | 
						|
	return nodeCopy
 | 
						|
}
 | 
						|
 | 
						|
func (s *NodeAllocatableStrategy) createCSINode(nodeName string, client clientset.Interface) error {
 | 
						|
	csiNode := &storagev1.CSINode{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: nodeName,
 | 
						|
			Annotations: map[string]string{
 | 
						|
				v1.MigratedPluginsAnnotationKey: strings.Join(s.MigratedPlugins, ","),
 | 
						|
			},
 | 
						|
		},
 | 
						|
		Spec: storagev1.CSINodeSpec{
 | 
						|
			Drivers: []storagev1.CSINodeDriver{},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	for driver, allocatable := range s.CsiNodeAllocatable {
 | 
						|
		d := storagev1.CSINodeDriver{
 | 
						|
			Name:        driver,
 | 
						|
			Allocatable: allocatable,
 | 
						|
			NodeID:      nodeName,
 | 
						|
		}
 | 
						|
		csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
 | 
						|
	}
 | 
						|
 | 
						|
	_, err := client.StorageV1().CSINodes().Create(context.TODO(), csiNode, metav1.CreateOptions{})
 | 
						|
	if apierrors.IsAlreadyExists(err) {
 | 
						|
		// Something created CSINode instance after we checked it did not exist.
 | 
						|
		// Make the caller to re-try PrepareDependentObjects by returning Conflict error
 | 
						|
		err = apierrors.NewConflict(storagev1beta1.Resource("csinodes"), nodeName, err)
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (s *NodeAllocatableStrategy) updateCSINode(csiNode *storagev1.CSINode, client clientset.Interface) error {
 | 
						|
	for driverName, allocatable := range s.CsiNodeAllocatable {
 | 
						|
		found := false
 | 
						|
		for i, driver := range csiNode.Spec.Drivers {
 | 
						|
			if driver.Name == driverName {
 | 
						|
				found = true
 | 
						|
				csiNode.Spec.Drivers[i].Allocatable = allocatable
 | 
						|
				break
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if !found {
 | 
						|
			d := storagev1.CSINodeDriver{
 | 
						|
				Name:        driverName,
 | 
						|
				Allocatable: allocatable,
 | 
						|
			}
 | 
						|
 | 
						|
			csiNode.Spec.Drivers = append(csiNode.Spec.Drivers, d)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	csiNode.Annotations[v1.MigratedPluginsAnnotationKey] = strings.Join(s.MigratedPlugins, ",")
 | 
						|
 | 
						|
	_, err := client.StorageV1().CSINodes().Update(context.TODO(), csiNode, metav1.UpdateOptions{})
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func (s *NodeAllocatableStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
 | 
						|
	csiNode, err := client.StorageV1().CSINodes().Get(context.TODO(), node.Name, metav1.GetOptions{})
 | 
						|
	if err != nil {
 | 
						|
		if apierrors.IsNotFound(err) {
 | 
						|
			return s.createCSINode(node.Name, client)
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	return s.updateCSINode(csiNode, client)
 | 
						|
}
 | 
						|
 | 
						|
func (s *NodeAllocatableStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
 | 
						|
	csiNode, err := client.StorageV1().CSINodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
 | 
						|
	if err != nil {
 | 
						|
		if apierrors.IsNotFound(err) {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	for driverName := range s.CsiNodeAllocatable {
 | 
						|
		for i, driver := range csiNode.Spec.Drivers {
 | 
						|
			if driver.Name == driverName {
 | 
						|
				csiNode.Spec.Drivers[i].Allocatable = nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return s.updateCSINode(csiNode, client)
 | 
						|
}
 | 
						|
 | 
						|
// UniqueNodeLabelStrategy sets a unique label for each node.
 | 
						|
type UniqueNodeLabelStrategy struct {
 | 
						|
	LabelKey string
 | 
						|
}
 | 
						|
 | 
						|
var _ PrepareNodeStrategy = &UniqueNodeLabelStrategy{}
 | 
						|
 | 
						|
func NewUniqueNodeLabelStrategy(labelKey string) *UniqueNodeLabelStrategy {
 | 
						|
	return &UniqueNodeLabelStrategy{
 | 
						|
		LabelKey: labelKey,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (s *UniqueNodeLabelStrategy) PreparePatch(*v1.Node) []byte {
 | 
						|
	labelString := fmt.Sprintf("{\"%v\":\"%v\"}", s.LabelKey, string(uuid.NewUUID()))
 | 
						|
	patch := fmt.Sprintf(`{"metadata":{"labels":%v}}`, labelString)
 | 
						|
	return []byte(patch)
 | 
						|
}
 | 
						|
 | 
						|
func (s *UniqueNodeLabelStrategy) CleanupNode(node *v1.Node) *v1.Node {
 | 
						|
	nodeCopy := node.DeepCopy()
 | 
						|
	if node.Labels != nil && len(node.Labels[s.LabelKey]) != 0 {
 | 
						|
		delete(nodeCopy.Labels, s.LabelKey)
 | 
						|
	}
 | 
						|
	return nodeCopy
 | 
						|
}
 | 
						|
 | 
						|
func (*UniqueNodeLabelStrategy) PrepareDependentObjects(node *v1.Node, client clientset.Interface) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (*UniqueNodeLabelStrategy) CleanupDependentObjects(nodeName string, client clientset.Interface) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func DoPrepareNode(client clientset.Interface, node *v1.Node, strategy PrepareNodeStrategy) error {
 | 
						|
	var err error
 | 
						|
	patch := strategy.PreparePatch(node)
 | 
						|
	if len(patch) == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	for attempt := 0; attempt < retries; attempt++ {
 | 
						|
		if _, err = client.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}); err == nil {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if !apierrors.IsConflict(err) {
 | 
						|
			return fmt.Errorf("error while applying patch %v to Node %v: %v", string(patch), node.Name, err)
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("too many conflicts when applying patch %v to Node %v: %s", string(patch), node.Name, err)
 | 
						|
	}
 | 
						|
 | 
						|
	for attempt := 0; attempt < retries; attempt++ {
 | 
						|
		if err = strategy.PrepareDependentObjects(node, client); err == nil {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if !apierrors.IsConflict(err) {
 | 
						|
			return fmt.Errorf("error while preparing objects for node %s: %s", node.Name, err)
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("too many conflicts when creating objects for node %s: %s", node.Name, err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func DoCleanupNode(client clientset.Interface, nodeName string, strategy PrepareNodeStrategy) error {
 | 
						|
	var err error
 | 
						|
	for attempt := 0; attempt < retries; attempt++ {
 | 
						|
		var node *v1.Node
 | 
						|
		node, err = client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("skipping cleanup of Node: failed to get Node %v: %v", nodeName, err)
 | 
						|
		}
 | 
						|
		updatedNode := strategy.CleanupNode(node)
 | 
						|
		if apiequality.Semantic.DeepEqual(node, updatedNode) {
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
		if _, err = client.CoreV1().Nodes().Update(context.TODO(), updatedNode, metav1.UpdateOptions{}); err == nil {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if !apierrors.IsConflict(err) {
 | 
						|
			return fmt.Errorf("error when updating Node %v: %v", nodeName, err)
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("too many conflicts when trying to cleanup Node %v: %s", nodeName, err)
 | 
						|
	}
 | 
						|
 | 
						|
	for attempt := 0; attempt < retries; attempt++ {
 | 
						|
		err = strategy.CleanupDependentObjects(nodeName, client)
 | 
						|
		if err == nil {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		if !apierrors.IsConflict(err) {
 | 
						|
			return fmt.Errorf("error when cleaning up Node %v objects: %v", nodeName, err)
 | 
						|
		}
 | 
						|
		time.Sleep(100 * time.Millisecond)
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return fmt.Errorf("too many conflicts when trying to cleanup Node %v objects: %s", nodeName, err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
type TestPodCreateStrategy func(client clientset.Interface, namespace string, podCount int) error
 | 
						|
 | 
						|
type CountToPodStrategy struct {
 | 
						|
	Count    int
 | 
						|
	Strategy TestPodCreateStrategy
 | 
						|
}
 | 
						|
 | 
						|
type TestPodCreatorConfig map[string][]CountToPodStrategy
 | 
						|
 | 
						|
func NewTestPodCreatorConfig() *TestPodCreatorConfig {
 | 
						|
	config := make(TestPodCreatorConfig)
 | 
						|
	return &config
 | 
						|
}
 | 
						|
 | 
						|
func (c *TestPodCreatorConfig) AddStrategy(
 | 
						|
	namespace string, podCount int, strategy TestPodCreateStrategy) {
 | 
						|
	(*c)[namespace] = append((*c)[namespace], CountToPodStrategy{Count: podCount, Strategy: strategy})
 | 
						|
}
 | 
						|
 | 
						|
type TestPodCreator struct {
 | 
						|
	Client clientset.Interface
 | 
						|
	// namespace -> count -> strategy
 | 
						|
	Config *TestPodCreatorConfig
 | 
						|
}
 | 
						|
 | 
						|
func NewTestPodCreator(client clientset.Interface, config *TestPodCreatorConfig) *TestPodCreator {
 | 
						|
	return &TestPodCreator{
 | 
						|
		Client: client,
 | 
						|
		Config: config,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (c *TestPodCreator) CreatePods() error {
 | 
						|
	for ns, v := range *(c.Config) {
 | 
						|
		for _, countToStrategy := range v {
 | 
						|
			if err := countToStrategy.Strategy(c.Client, ns, countToStrategy.Count); err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func MakePodSpec() v1.PodSpec {
 | 
						|
	return v1.PodSpec{
 | 
						|
		Containers: []v1.Container{{
 | 
						|
			Name:  "pause",
 | 
						|
			Image: "k8s.gcr.io/pause:3.7",
 | 
						|
			Ports: []v1.ContainerPort{{ContainerPort: 80}},
 | 
						|
			Resources: v1.ResourceRequirements{
 | 
						|
				Limits: v1.ResourceList{
 | 
						|
					v1.ResourceCPU:    resource.MustParse("100m"),
 | 
						|
					v1.ResourceMemory: resource.MustParse("500Mi"),
 | 
						|
				},
 | 
						|
				Requests: v1.ResourceList{
 | 
						|
					v1.ResourceCPU:    resource.MustParse("100m"),
 | 
						|
					v1.ResourceMemory: resource.MustParse("500Mi"),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func makeCreatePod(client clientset.Interface, namespace string, podTemplate *v1.Pod) error {
 | 
						|
	if err := CreatePodWithRetries(client, namespace, podTemplate); err != nil {
 | 
						|
		return fmt.Errorf("error creating pod: %v", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func CreatePod(client clientset.Interface, namespace string, podCount int, podTemplate *v1.Pod) error {
 | 
						|
	var createError error
 | 
						|
	lock := sync.Mutex{}
 | 
						|
	createPodFunc := func(i int) {
 | 
						|
		if err := makeCreatePod(client, namespace, podTemplate); err != nil {
 | 
						|
			lock.Lock()
 | 
						|
			defer lock.Unlock()
 | 
						|
			createError = err
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if podCount < 30 {
 | 
						|
		workqueue.ParallelizeUntil(context.TODO(), podCount, podCount, createPodFunc)
 | 
						|
	} else {
 | 
						|
		workqueue.ParallelizeUntil(context.TODO(), 30, podCount, createPodFunc)
 | 
						|
	}
 | 
						|
	return createError
 | 
						|
}
 | 
						|
 | 
						|
func CreatePodWithPersistentVolume(client clientset.Interface, namespace string, claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod, count int, bindVolume bool) error {
 | 
						|
	var createError error
 | 
						|
	lock := sync.Mutex{}
 | 
						|
	createPodFunc := func(i int) {
 | 
						|
		pvcName := fmt.Sprintf("pvc-%d", i)
 | 
						|
		// pvc
 | 
						|
		pvc := claimTemplate.DeepCopy()
 | 
						|
		pvc.Name = pvcName
 | 
						|
		// pv
 | 
						|
		pv := factory(i)
 | 
						|
		// PVs are cluster-wide resources.
 | 
						|
		// Prepend a namespace to make the name globally unique.
 | 
						|
		pv.Name = fmt.Sprintf("%s-%s", namespace, pv.Name)
 | 
						|
		if bindVolume {
 | 
						|
			// bind pv to "pvc-$i"
 | 
						|
			pv.Spec.ClaimRef = &v1.ObjectReference{
 | 
						|
				Kind:       "PersistentVolumeClaim",
 | 
						|
				Namespace:  namespace,
 | 
						|
				Name:       pvcName,
 | 
						|
				APIVersion: "v1",
 | 
						|
			}
 | 
						|
			pv.Status.Phase = v1.VolumeBound
 | 
						|
 | 
						|
			// bind pvc to "pv-$i"
 | 
						|
			pvc.Spec.VolumeName = pv.Name
 | 
						|
			pvc.Status.Phase = v1.ClaimBound
 | 
						|
		} else {
 | 
						|
			pv.Status.Phase = v1.VolumeAvailable
 | 
						|
		}
 | 
						|
 | 
						|
		// Create PVC first as it's referenced by the PV when the `bindVolume` is true.
 | 
						|
		if err := CreatePersistentVolumeClaimWithRetries(client, namespace, pvc); err != nil {
 | 
						|
			lock.Lock()
 | 
						|
			defer lock.Unlock()
 | 
						|
			createError = fmt.Errorf("error creating PVC: %s", err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// We need to update statuses separately, as creating pv/pvc resets status to the default one.
 | 
						|
		if _, err := client.CoreV1().PersistentVolumeClaims(namespace).UpdateStatus(context.TODO(), pvc, metav1.UpdateOptions{}); err != nil {
 | 
						|
			lock.Lock()
 | 
						|
			defer lock.Unlock()
 | 
						|
			createError = fmt.Errorf("error updating PVC status: %s", err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		if err := CreatePersistentVolumeWithRetries(client, pv); err != nil {
 | 
						|
			lock.Lock()
 | 
						|
			defer lock.Unlock()
 | 
						|
			createError = fmt.Errorf("error creating PV: %s", err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		// We need to update statuses separately, as creating pv/pvc resets status to the default one.
 | 
						|
		if _, err := client.CoreV1().PersistentVolumes().UpdateStatus(context.TODO(), pv, metav1.UpdateOptions{}); err != nil {
 | 
						|
			lock.Lock()
 | 
						|
			defer lock.Unlock()
 | 
						|
			createError = fmt.Errorf("error updating PV status: %s", err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// pod
 | 
						|
		pod := podTemplate.DeepCopy()
 | 
						|
		pod.Spec.Volumes = []v1.Volume{
 | 
						|
			{
 | 
						|
				Name: "vol",
 | 
						|
				VolumeSource: v1.VolumeSource{
 | 
						|
					PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
 | 
						|
						ClaimName: pvcName,
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		}
 | 
						|
		if err := makeCreatePod(client, namespace, pod); err != nil {
 | 
						|
			lock.Lock()
 | 
						|
			defer lock.Unlock()
 | 
						|
			createError = err
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if count < 30 {
 | 
						|
		workqueue.ParallelizeUntil(context.TODO(), count, count, createPodFunc)
 | 
						|
	} else {
 | 
						|
		workqueue.ParallelizeUntil(context.TODO(), 30, count, createPodFunc)
 | 
						|
	}
 | 
						|
	return createError
 | 
						|
}
 | 
						|
 | 
						|
func createController(client clientset.Interface, controllerName, namespace string, podCount int, podTemplate *v1.Pod) error {
 | 
						|
	rc := &v1.ReplicationController{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: controllerName,
 | 
						|
		},
 | 
						|
		Spec: v1.ReplicationControllerSpec{
 | 
						|
			Replicas: func(i int) *int32 { x := int32(i); return &x }(podCount),
 | 
						|
			Selector: map[string]string{"name": controllerName},
 | 
						|
			Template: &v1.PodTemplateSpec{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{
 | 
						|
					Labels: map[string]string{"name": controllerName},
 | 
						|
				},
 | 
						|
				Spec: podTemplate.Spec,
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	if err := CreateRCWithRetries(client, namespace, rc); err != nil {
 | 
						|
		return fmt.Errorf("error creating replication controller: %v", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func NewCustomCreatePodStrategy(podTemplate *v1.Pod) TestPodCreateStrategy {
 | 
						|
	return func(client clientset.Interface, namespace string, podCount int) error {
 | 
						|
		return CreatePod(client, namespace, podCount, podTemplate)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// volumeFactory creates an unique PersistentVolume for given integer.
 | 
						|
type volumeFactory func(uniqueID int) *v1.PersistentVolume
 | 
						|
 | 
						|
func NewCreatePodWithPersistentVolumeStrategy(claimTemplate *v1.PersistentVolumeClaim, factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
 | 
						|
	return func(client clientset.Interface, namespace string, podCount int) error {
 | 
						|
		return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factory, podTemplate, podCount, true /* bindVolume */)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func makeUnboundPersistentVolumeClaim(storageClass string) *v1.PersistentVolumeClaim {
 | 
						|
	return &v1.PersistentVolumeClaim{
 | 
						|
		Spec: v1.PersistentVolumeClaimSpec{
 | 
						|
			AccessModes:      []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
 | 
						|
			StorageClassName: &storageClass,
 | 
						|
			Resources: v1.ResourceRequirements{
 | 
						|
				Requests: v1.ResourceList{
 | 
						|
					v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func NewCreatePodWithPersistentVolumeWithFirstConsumerStrategy(factory volumeFactory, podTemplate *v1.Pod) TestPodCreateStrategy {
 | 
						|
	return func(client clientset.Interface, namespace string, podCount int) error {
 | 
						|
		volumeBindingMode := storagev1.VolumeBindingWaitForFirstConsumer
 | 
						|
		storageClass := &storagev1.StorageClass{
 | 
						|
			ObjectMeta: metav1.ObjectMeta{
 | 
						|
				Name: "storagev1-class-1",
 | 
						|
			},
 | 
						|
			Provisioner:       "kubernetes.io/gce-pd",
 | 
						|
			VolumeBindingMode: &volumeBindingMode,
 | 
						|
		}
 | 
						|
		claimTemplate := makeUnboundPersistentVolumeClaim(storageClass.Name)
 | 
						|
 | 
						|
		if err := CreateStorageClassWithRetries(client, storageClass); err != nil {
 | 
						|
			return fmt.Errorf("failed to create storagev1 class: %v", err)
 | 
						|
		}
 | 
						|
 | 
						|
		factoryWithStorageClass := func(i int) *v1.PersistentVolume {
 | 
						|
			pv := factory(i)
 | 
						|
			pv.Spec.StorageClassName = storageClass.Name
 | 
						|
			return pv
 | 
						|
		}
 | 
						|
 | 
						|
		return CreatePodWithPersistentVolume(client, namespace, claimTemplate, factoryWithStorageClass, podTemplate, podCount, false /* bindVolume */)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func NewSimpleCreatePodStrategy() TestPodCreateStrategy {
 | 
						|
	basePod := &v1.Pod{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			GenerateName: "simple-pod-",
 | 
						|
		},
 | 
						|
		Spec: MakePodSpec(),
 | 
						|
	}
 | 
						|
	return NewCustomCreatePodStrategy(basePod)
 | 
						|
}
 | 
						|
 | 
						|
func NewSimpleWithControllerCreatePodStrategy(controllerName string) TestPodCreateStrategy {
 | 
						|
	return func(client clientset.Interface, namespace string, podCount int) error {
 | 
						|
		basePod := &v1.Pod{
 | 
						|
			ObjectMeta: metav1.ObjectMeta{
 | 
						|
				GenerateName: controllerName + "-pod-",
 | 
						|
				Labels:       map[string]string{"name": controllerName},
 | 
						|
			},
 | 
						|
			Spec: MakePodSpec(),
 | 
						|
		}
 | 
						|
		if err := createController(client, controllerName, namespace, podCount, basePod); err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		return CreatePod(client, namespace, podCount, basePod)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type SecretConfig struct {
 | 
						|
	Content   map[string]string
 | 
						|
	Client    clientset.Interface
 | 
						|
	Name      string
 | 
						|
	Namespace string
 | 
						|
	// If set this function will be used to print log lines instead of klog.
 | 
						|
	LogFunc func(fmt string, args ...interface{})
 | 
						|
}
 | 
						|
 | 
						|
func (config *SecretConfig) Run() error {
 | 
						|
	secret := &v1.Secret{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: config.Name,
 | 
						|
		},
 | 
						|
		StringData: map[string]string{},
 | 
						|
	}
 | 
						|
	for k, v := range config.Content {
 | 
						|
		secret.StringData[k] = v
 | 
						|
	}
 | 
						|
 | 
						|
	if err := CreateSecretWithRetries(config.Client, config.Namespace, secret); err != nil {
 | 
						|
		return fmt.Errorf("error creating secret: %v", err)
 | 
						|
	}
 | 
						|
	config.LogFunc("Created secret %v/%v", config.Namespace, config.Name)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (config *SecretConfig) Stop() error {
 | 
						|
	if err := DeleteResourceWithRetries(config.Client, api.Kind("Secret"), config.Namespace, config.Name, metav1.DeleteOptions{}); err != nil {
 | 
						|
		return fmt.Errorf("error deleting secret: %v", err)
 | 
						|
	}
 | 
						|
	config.LogFunc("Deleted secret %v/%v", config.Namespace, config.Name)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// TODO: attach secrets using different possibilities: env vars, image pull secrets.
 | 
						|
func attachSecrets(template *v1.PodTemplateSpec, secretNames []string) {
 | 
						|
	volumes := make([]v1.Volume, 0, len(secretNames))
 | 
						|
	mounts := make([]v1.VolumeMount, 0, len(secretNames))
 | 
						|
	for _, name := range secretNames {
 | 
						|
		volumes = append(volumes, v1.Volume{
 | 
						|
			Name: name,
 | 
						|
			VolumeSource: v1.VolumeSource{
 | 
						|
				Secret: &v1.SecretVolumeSource{
 | 
						|
					SecretName: name,
 | 
						|
				},
 | 
						|
			},
 | 
						|
		})
 | 
						|
		mounts = append(mounts, v1.VolumeMount{
 | 
						|
			Name:      name,
 | 
						|
			MountPath: fmt.Sprintf("/%v", name),
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	template.Spec.Volumes = volumes
 | 
						|
	template.Spec.Containers[0].VolumeMounts = mounts
 | 
						|
}
 | 
						|
 | 
						|
type ConfigMapConfig struct {
 | 
						|
	Content   map[string]string
 | 
						|
	Client    clientset.Interface
 | 
						|
	Name      string
 | 
						|
	Namespace string
 | 
						|
	// If set this function will be used to print log lines instead of klog.
 | 
						|
	LogFunc func(fmt string, args ...interface{})
 | 
						|
}
 | 
						|
 | 
						|
func (config *ConfigMapConfig) Run() error {
 | 
						|
	configMap := &v1.ConfigMap{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: config.Name,
 | 
						|
		},
 | 
						|
		Data: map[string]string{},
 | 
						|
	}
 | 
						|
	for k, v := range config.Content {
 | 
						|
		configMap.Data[k] = v
 | 
						|
	}
 | 
						|
 | 
						|
	if err := CreateConfigMapWithRetries(config.Client, config.Namespace, configMap); err != nil {
 | 
						|
		return fmt.Errorf("error creating configmap: %v", err)
 | 
						|
	}
 | 
						|
	config.LogFunc("Created configmap %v/%v", config.Namespace, config.Name)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (config *ConfigMapConfig) Stop() error {
 | 
						|
	if err := DeleteResourceWithRetries(config.Client, api.Kind("ConfigMap"), config.Namespace, config.Name, metav1.DeleteOptions{}); err != nil {
 | 
						|
		return fmt.Errorf("error deleting configmap: %v", err)
 | 
						|
	}
 | 
						|
	config.LogFunc("Deleted configmap %v/%v", config.Namespace, config.Name)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// TODO: attach configmaps using different possibilities: env vars.
 | 
						|
func attachConfigMaps(template *v1.PodTemplateSpec, configMapNames []string) {
 | 
						|
	volumes := make([]v1.Volume, 0, len(configMapNames))
 | 
						|
	mounts := make([]v1.VolumeMount, 0, len(configMapNames))
 | 
						|
	for _, name := range configMapNames {
 | 
						|
		volumes = append(volumes, v1.Volume{
 | 
						|
			Name: name,
 | 
						|
			VolumeSource: v1.VolumeSource{
 | 
						|
				ConfigMap: &v1.ConfigMapVolumeSource{
 | 
						|
					LocalObjectReference: v1.LocalObjectReference{
 | 
						|
						Name: name,
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		})
 | 
						|
		mounts = append(mounts, v1.VolumeMount{
 | 
						|
			Name:      name,
 | 
						|
			MountPath: fmt.Sprintf("/%v", name),
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	template.Spec.Volumes = volumes
 | 
						|
	template.Spec.Containers[0].VolumeMounts = mounts
 | 
						|
}
 | 
						|
 | 
						|
func (config *RCConfig) getTerminationGracePeriodSeconds(defaultGrace *int64) *int64 {
 | 
						|
	if config.TerminationGracePeriodSeconds == nil || *config.TerminationGracePeriodSeconds < 0 {
 | 
						|
		return defaultGrace
 | 
						|
	}
 | 
						|
	return config.TerminationGracePeriodSeconds
 | 
						|
}
 | 
						|
 | 
						|
func attachServiceAccountTokenProjection(template *v1.PodTemplateSpec, name string) {
 | 
						|
	template.Spec.Containers[0].VolumeMounts = append(template.Spec.Containers[0].VolumeMounts,
 | 
						|
		v1.VolumeMount{
 | 
						|
			Name:      name,
 | 
						|
			MountPath: "/var/service-account-tokens/" + name,
 | 
						|
		})
 | 
						|
 | 
						|
	template.Spec.Volumes = append(template.Spec.Volumes,
 | 
						|
		v1.Volume{
 | 
						|
			Name: name,
 | 
						|
			VolumeSource: v1.VolumeSource{
 | 
						|
				Projected: &v1.ProjectedVolumeSource{
 | 
						|
					Sources: []v1.VolumeProjection{
 | 
						|
						{
 | 
						|
							ServiceAccountToken: &v1.ServiceAccountTokenProjection{
 | 
						|
								Path:     "token",
 | 
						|
								Audience: name,
 | 
						|
							},
 | 
						|
						},
 | 
						|
						{
 | 
						|
							ConfigMap: &v1.ConfigMapProjection{
 | 
						|
								LocalObjectReference: v1.LocalObjectReference{
 | 
						|
									Name: "kube-root-ca-crt",
 | 
						|
								},
 | 
						|
								Items: []v1.KeyToPath{
 | 
						|
									{
 | 
						|
										Key:  "ca.crt",
 | 
						|
										Path: "ca.crt",
 | 
						|
									},
 | 
						|
								},
 | 
						|
							},
 | 
						|
						},
 | 
						|
						{
 | 
						|
							DownwardAPI: &v1.DownwardAPIProjection{
 | 
						|
								Items: []v1.DownwardAPIVolumeFile{
 | 
						|
									{
 | 
						|
										Path: "namespace",
 | 
						|
										FieldRef: &v1.ObjectFieldSelector{
 | 
						|
											APIVersion: "v1",
 | 
						|
											FieldPath:  "metadata.namespace",
 | 
						|
										},
 | 
						|
									},
 | 
						|
								},
 | 
						|
							},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		})
 | 
						|
}
 | 
						|
 | 
						|
type DaemonConfig struct {
 | 
						|
	Client    clientset.Interface
 | 
						|
	Name      string
 | 
						|
	Namespace string
 | 
						|
	Image     string
 | 
						|
	// If set this function will be used to print log lines instead of klog.
 | 
						|
	LogFunc func(fmt string, args ...interface{})
 | 
						|
	// How long we wait for DaemonSet to become running.
 | 
						|
	Timeout time.Duration
 | 
						|
}
 | 
						|
 | 
						|
func (config *DaemonConfig) Run() error {
 | 
						|
	if config.Image == "" {
 | 
						|
		config.Image = "k8s.gcr.io/pause:3.7"
 | 
						|
	}
 | 
						|
	nameLabel := map[string]string{
 | 
						|
		"name": config.Name + "-daemon",
 | 
						|
	}
 | 
						|
	daemon := &apps.DaemonSet{
 | 
						|
		ObjectMeta: metav1.ObjectMeta{
 | 
						|
			Name: config.Name,
 | 
						|
		},
 | 
						|
		Spec: apps.DaemonSetSpec{
 | 
						|
			Template: v1.PodTemplateSpec{
 | 
						|
				ObjectMeta: metav1.ObjectMeta{
 | 
						|
					Labels: nameLabel,
 | 
						|
				},
 | 
						|
				Spec: v1.PodSpec{
 | 
						|
					Containers: []v1.Container{
 | 
						|
						{
 | 
						|
							Name:  config.Name,
 | 
						|
							Image: config.Image,
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	if err := CreateDaemonSetWithRetries(config.Client, config.Namespace, daemon); err != nil {
 | 
						|
		return fmt.Errorf("error creating daemonset: %v", err)
 | 
						|
	}
 | 
						|
 | 
						|
	var nodes *v1.NodeList
 | 
						|
	var err error
 | 
						|
	for i := 0; i < retries; i++ {
 | 
						|
		// Wait for all daemons to be running
 | 
						|
		nodes, err = config.Client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ResourceVersion: "0"})
 | 
						|
		if err == nil {
 | 
						|
			break
 | 
						|
		} else if i+1 == retries {
 | 
						|
			return fmt.Errorf("error listing Nodes while waiting for DaemonSet %v: %v", config.Name, err)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	timeout := config.Timeout
 | 
						|
	if timeout <= 0 {
 | 
						|
		timeout = 5 * time.Minute
 | 
						|
	}
 | 
						|
 | 
						|
	ps, err := NewPodStore(config.Client, config.Namespace, labels.SelectorFromSet(nameLabel), fields.Everything())
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	defer ps.Stop()
 | 
						|
 | 
						|
	err = wait.Poll(time.Second, timeout, func() (bool, error) {
 | 
						|
		pods := ps.List()
 | 
						|
 | 
						|
		nodeHasDaemon := sets.NewString()
 | 
						|
		for _, pod := range pods {
 | 
						|
			podReady, _ := PodRunningReady(pod)
 | 
						|
			if pod.Spec.NodeName != "" && podReady {
 | 
						|
				nodeHasDaemon.Insert(pod.Spec.NodeName)
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		running := len(nodeHasDaemon)
 | 
						|
		config.LogFunc("Found %v/%v Daemons %v running", running, config.Name, len(nodes.Items))
 | 
						|
		return running == len(nodes.Items), nil
 | 
						|
	})
 | 
						|
	if err != nil {
 | 
						|
		config.LogFunc("Timed out while waiting for DaemonSet %v/%v to be running.", config.Namespace, config.Name)
 | 
						|
	} else {
 | 
						|
		config.LogFunc("Created Daemon %v/%v", config.Namespace, config.Name)
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 |