diff --git a/config/configStruct.go b/config/configStruct.go index af3e13571..3a23dd94d 100644 --- a/config/configStruct.go +++ b/config/configStruct.go @@ -24,15 +24,16 @@ type KubeConfig struct { } type ConfigStruct struct { - Tap configStructs.TapConfig `yaml:"tap"` - Logs configStructs.LogsConfig `yaml:"logs"` - Config configStructs.ConfigConfig `yaml:"config,omitempty"` - Kube KubeConfig `yaml:"kube"` - DumpLogs bool `yaml:"dumplogs" default:"false"` - HeadlessMode bool `yaml:"headless" default:"false"` - License string `yaml:"license" default:""` - Scripting configStructs.ScriptingConfig `yaml:"scripting"` - ResourceLabels map[string]string `yaml:"resourceLabels" default:"{}"` + Tap configStructs.TapConfig `yaml:"tap"` + Logs configStructs.LogsConfig `yaml:"logs"` + Config configStructs.ConfigConfig `yaml:"config,omitempty"` + Kube KubeConfig `yaml:"kube"` + DumpLogs bool `yaml:"dumplogs" default:"false"` + HeadlessMode bool `yaml:"headless" default:"false"` + License string `yaml:"license" default:""` + Scripting configStructs.ScriptingConfig `yaml:"scripting"` + ResourceLabels map[string]string `yaml:"resourceLabels" default:"{}"` + NodeSelectorTerms []v1.NodeSelectorTerm `yaml:"nodeSelectorTerms" default:"[]"` } func (config *ConfigStruct) ImagePullPolicy() v1.PullPolicy { diff --git a/kubernetes/provider.go b/kubernetes/provider.go index 154cbe908..6326776eb 100644 --- a/kubernetes/provider.go +++ b/kubernetes/provider.go @@ -26,10 +26,6 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/version" "k8s.io/apimachinery/pkg/watch" - applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1" - applyconfcore "k8s.io/client-go/applyconfigurations/core/v1" - v1 "k8s.io/client-go/applyconfigurations/core/v1" - applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" "k8s.io/client-go/rest" @@ -184,19 +180,19 @@ type PodOptions struct { func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) { cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit) if err != nil { - return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName) + return nil, fmt.Errorf("invalid cpu limit for %s pod", opts.PodName) } memLimit, err := resource.ParseQuantity(opts.Resources.MemoryLimit) if err != nil { - return nil, fmt.Errorf("invalid memory limit for %s container", opts.PodName) + return nil, fmt.Errorf("invalid memory limit for %s pod", opts.PodName) } cpuRequests, err := resource.ParseQuantity(opts.Resources.CpuRequests) if err != nil { - return nil, fmt.Errorf("invalid cpu request for %s container", opts.PodName) + return nil, fmt.Errorf("invalid cpu request for %s pod", opts.PodName) } memRequests, err := resource.ParseQuantity(opts.Resources.MemoryRequests) if err != nil { - return nil, fmt.Errorf("invalid memory request for %s container", opts.PodName) + return nil, fmt.Errorf("invalid memory request for %s pod", opts.PodName) } command := []string{ @@ -251,6 +247,16 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) { }, } + if len(config.Config.NodeSelectorTerms) > 0 { + pod.Spec.Affinity = &core.Affinity{ + NodeAffinity: &core.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &core.NodeSelector{ + NodeSelectorTerms: config.Config.NodeSelectorTerms, + }, + }, + } + } + //define the service account only when it exists to prevent pod crash if opts.ServiceAccountName != "" { pod.Spec.ServiceAccountName = opts.ServiceAccountName @@ -261,19 +267,19 @@ func (provider *Provider) BuildHubPod(opts *PodOptions) (*core.Pod, error) { func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPort string) (*core.Pod, error) { cpuLimit, err := resource.ParseQuantity(opts.Resources.CpuLimit) if err != nil { - return nil, fmt.Errorf("invalid cpu limit for %s container", opts.PodName) + return nil, fmt.Errorf("invalid cpu limit for %s pod", opts.PodName) } memLimit, err := resource.ParseQuantity(opts.Resources.MemoryLimit) if err != nil { - return nil, fmt.Errorf("invalid memory limit for %s container", opts.PodName) + return nil, fmt.Errorf("invalid memory limit for %s pod", opts.PodName) } cpuRequests, err := resource.ParseQuantity(opts.Resources.CpuRequests) if err != nil { - return nil, fmt.Errorf("invalid cpu request for %s container", opts.PodName) + return nil, fmt.Errorf("invalid cpu request for %s pod", opts.PodName) } memRequests, err := resource.ParseQuantity(opts.Resources.MemoryRequests) if err != nil { - return nil, fmt.Errorf("invalid memory request for %s container", opts.PodName) + return nil, fmt.Errorf("invalid memory request for %s pod", opts.PodName) } volumeMounts := []core.VolumeMount{} @@ -349,6 +355,16 @@ func (provider *Provider) BuildFrontPod(opts *PodOptions, hubHost string, hubPor }, } + if len(config.Config.NodeSelectorTerms) > 0 { + pod.Spec.Affinity = &core.Affinity{ + NodeAffinity: &core.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &core.NodeSelector{ + NodeSelectorTerms: config.Config.NodeSelectorTerms, + }, + }, + } + } + //define the service account only when it exists to prevent pod crash if opts.ServiceAccountName != "" { pod.Spec.ServiceAccountName = opts.ServiceAccountName @@ -627,12 +643,206 @@ func (provider *Provider) handleRemovalError(err error) error { return err } +func (provider *Provider) BuildWorkerDaemonSet( + ctx context.Context, + podImage string, + podName string, + serviceAccountName string, + resources configStructs.Resources, + imagePullPolicy core.PullPolicy, + imagePullSecrets []core.LocalObjectReference, + serviceMesh bool, + tls bool, + debug bool, +) (*DaemonSet, error) { + // Resource limits + cpuLimit, err := resource.ParseQuantity(resources.CpuLimit) + if err != nil { + return nil, fmt.Errorf("invalid cpu limit for %s pod", podName) + } + memLimit, err := resource.ParseQuantity(resources.MemoryLimit) + if err != nil { + return nil, fmt.Errorf("invalid memory limit for %s pod", podName) + } + cpuRequests, err := resource.ParseQuantity(resources.CpuRequests) + if err != nil { + return nil, fmt.Errorf("invalid cpu request for %s pod", podName) + } + memRequests, err := resource.ParseQuantity(resources.MemoryRequests) + if err != nil { + return nil, fmt.Errorf("invalid memory request for %s pod", podName) + } + + // Command + command := []string{"./worker", "-i", "any", "-port", "8897"} + if debug { + command = append(command, "-debug") + } + if serviceMesh { + command = append(command, "-servicemesh") + } + if tls { + command = append(command, "-tls") + } + if serviceMesh || tls { + command = append(command, "-procfs", procfsMountPath) + } + + // Linux capabilities + dropCaps := []core.Capability{"ALL"} + addCaps := []core.Capability{"NET_RAW", "NET_ADMIN"} // to listen to traffic using libpcap + if serviceMesh || tls { + addCaps = append(addCaps, "SYS_ADMIN") // to read /proc/PID/net/ns + to install eBPF programs (kernel < 5.8) + addCaps = append(addCaps, "SYS_PTRACE") // to set netns to other process + to open libssl.so of other process + + if serviceMesh { + addCaps = append(addCaps, "DAC_OVERRIDE") // to read /proc/PID/environ + } + + if tls { + addCaps = append(addCaps, "SYS_RESOURCE") // to change rlimits for eBPF + } + } + + // Environment variables + var env []core.EnvVar + if debug { + env = append(env, core.EnvVar{ + Name: "MEMORY_PROFILING_ENABLED", + Value: "true", + }) + env = append(env, core.EnvVar{ + Name: "MEMORY_PROFILING_INTERVAL_SECONDS", + Value: "10", + }) + env = append(env, core.EnvVar{ + Name: "MEMORY_USAGE_INTERVAL_MILLISECONDS", + Value: "500", + }) + } + + // Volumes and volume mounts + + // Host procfs is needed inside the container because we need access to + // the network namespaces of processes on the machine. + procfsVolume := core.Volume{ + Name: procfsVolumeName, + VolumeSource: core.VolumeSource{ + HostPath: &core.HostPathVolumeSource{ + Path: "/proc", + }, + }, + } + procfsVolumeMount := core.VolumeMount{ + Name: procfsVolumeName, + MountPath: procfsMountPath, + ReadOnly: true, + } + + // We need access to /sys in order to install certain eBPF tracepoints + sysfsVolume := core.Volume{ + Name: sysfsVolumeName, + VolumeSource: core.VolumeSource{ + HostPath: &core.HostPathVolumeSource{ + Path: "/sys", + }, + }, + } + sysfsVolumeMount := core.VolumeMount{ + Name: sysfsVolumeName, + MountPath: sysfsMountPath, + ReadOnly: true, + } + + // Containers + containers := []core.Container{ + { + Name: podName, + Image: podImage, + ImagePullPolicy: imagePullPolicy, + VolumeMounts: []core.VolumeMount{procfsVolumeMount, sysfsVolumeMount}, + Command: command, + Resources: core.ResourceRequirements{ + Limits: core.ResourceList{ + "cpu": cpuLimit, + "memory": memLimit, + }, + Requests: core.ResourceList{ + "cpu": cpuRequests, + "memory": memRequests, + }, + }, + SecurityContext: &core.SecurityContext{ + Capabilities: &core.Capabilities{ + Add: addCaps, + Drop: dropCaps, + }, + }, + Env: env, + }, + } + + // Tolerations + tolerations := []core.Toleration{ + { + Operator: core.TolerationOpExists, + Effect: core.TaintEffectNoExecute, + }, + } + if !config.Config.Tap.IgnoreTainted { + tolerations = append(tolerations, core.Toleration{ + Operator: core.TolerationOpExists, + Effect: core.TaintEffectNoSchedule, + }) + } + + // Pod + pod := core.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Labels: buildWithDefaultLabels(map[string]string{ + "app": podName, + }, provider), + }, + Spec: core.PodSpec{ + HostNetwork: true, + Containers: containers, + Volumes: []core.Volume{procfsVolume, sysfsVolume}, + DNSPolicy: core.DNSClusterFirstWithHostNet, + TerminationGracePeriodSeconds: new(int64), + Tolerations: tolerations, + ImagePullSecrets: imagePullSecrets, + }, + } + + if len(config.Config.NodeSelectorTerms) > 0 { + pod.Spec.Affinity = &core.Affinity{ + NodeAffinity: &core.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &core.NodeSelector{ + NodeSelectorTerms: config.Config.NodeSelectorTerms, + }, + }, + } + } + + return &DaemonSet{ + Spec: DaemonSetSpec{ + Selector: metav1.LabelSelector{ + MatchLabels: buildWithDefaultLabels(map[string]string{ + "app": podName, + }, provider), + }, + Template: pod, + }, + }, nil +} + func (provider *Provider) ApplyWorkerDaemonSet( ctx context.Context, namespace string, daemonSetName string, podImage string, - workerPodName string, + podName string, serviceAccountName string, resources configStructs.Resources, imagePullPolicy core.PullPolicy, @@ -645,210 +855,40 @@ func (provider *Provider) ApplyWorkerDaemonSet( Str("namespace", namespace). Str("daemonset-name", daemonSetName). Str("image", podImage). - Str("pod", workerPodName). + Str("pod", podName). Msg("Applying worker DaemonSets.") - command := []string{"./worker", "-i", "any", "-port", "8897"} - - if debug { - command = append(command, "-debug") - } - - if serviceMesh { - command = append(command, "-servicemesh") - } - - if tls { - command = append(command, "-tls") - } - - if serviceMesh || tls { - command = append(command, "-procfs", procfsMountPath) - } - - workerContainer := applyconfcore.Container() - workerContainer.WithName(workerPodName) - workerContainer.WithImage(podImage) - workerContainer.WithImagePullPolicy(imagePullPolicy) - - caps := applyconfcore.Capabilities().WithDrop("ALL") - - caps = caps.WithAdd("NET_RAW").WithAdd("NET_ADMIN") // to listen to traffic using libpcap - - if serviceMesh || tls { - caps = caps.WithAdd("SYS_ADMIN") // to read /proc/PID/net/ns + to install eBPF programs (kernel < 5.8) - caps = caps.WithAdd("SYS_PTRACE") // to set netns to other process + to open libssl.so of other process - - if serviceMesh { - caps = caps.WithAdd("DAC_OVERRIDE") // to read /proc/PID/environ - } - - if tls { - caps = caps.WithAdd("SYS_RESOURCE") // to change rlimits for eBPF - } - } - - workerContainer.WithSecurityContext(applyconfcore.SecurityContext().WithCapabilities(caps)) - - workerContainer.WithCommand(command...) - - var envvars []*v1.EnvVarApplyConfiguration - - // Worker build with -race flag requires the GODEBUG=netdns=go - // envvars = append(envvars, applyconfcore.EnvVar().WithName("GODEBUG").WithValue("netdns=go")) - - if debug { - envvars = append(envvars, applyconfcore.EnvVar().WithName("MEMORY_PROFILING_ENABLED").WithValue("true")) - envvars = append(envvars, applyconfcore.EnvVar().WithName("MEMORY_PROFILING_INTERVAL_SECONDS").WithValue("10")) - envvars = append(envvars, applyconfcore.EnvVar().WithName("MEMORY_USAGE_INTERVAL_MILLISECONDS").WithValue("500")) - } - - workerContainer.WithEnv(envvars...) - - cpuLimit, err := resource.ParseQuantity(resources.CpuLimit) + daemonSet, err := provider.BuildWorkerDaemonSet( + ctx, + podImage, + podName, + serviceAccountName, + resources, + imagePullPolicy, + imagePullSecrets, + serviceMesh, + tls, + debug, + ) if err != nil { - return fmt.Errorf("invalid cpu limit for %s container", workerPodName) + return err } - memLimit, err := resource.ParseQuantity(resources.MemoryLimit) - if err != nil { - return fmt.Errorf("invalid memory limit for %s container", workerPodName) - } - cpuRequests, err := resource.ParseQuantity(resources.CpuRequests) - if err != nil { - return fmt.Errorf("invalid cpu request for %s container", workerPodName) - } - memRequests, err := resource.ParseQuantity(resources.MemoryRequests) - if err != nil { - return fmt.Errorf("invalid memory request for %s container", workerPodName) - } - workerResourceLimits := core.ResourceList{ - "cpu": cpuLimit, - "memory": memLimit, - } - workerResourceRequests := core.ResourceList{ - "cpu": cpuRequests, - "memory": memRequests, - } - workerResources := applyconfcore.ResourceRequirements().WithRequests(workerResourceRequests).WithLimits(workerResourceLimits) - workerContainer.WithResources(workerResources) - - nodeAffinity := applyconfcore.NodeAffinity() - affinity := applyconfcore.Affinity() - affinity.WithNodeAffinity(nodeAffinity) - - var tolerations []*v1.TolerationApplyConfiguration - - noExecuteToleration := applyconfcore.Toleration() - noExecuteToleration.WithOperator(core.TolerationOpExists) - noExecuteToleration.WithEffect(core.TaintEffectNoExecute) - tolerations = append(tolerations, noExecuteToleration) - - noScheduleToleration := applyconfcore.Toleration() - noScheduleToleration.WithOperator(core.TolerationOpExists) - noScheduleToleration.WithEffect(core.TaintEffectNoSchedule) - if !config.Config.Tap.IgnoreTainted { - tolerations = append(tolerations, noScheduleToleration) - } - - // Host procfs is needed inside the container because we need access to - // the network namespaces of processes on the machine. - // - procfsVolume := applyconfcore.Volume() - procfsVolume.WithName(procfsVolumeName).WithHostPath(applyconfcore.HostPathVolumeSource().WithPath("/proc")) - procfsVolumeMount := applyconfcore.VolumeMount().WithName(procfsVolumeName).WithMountPath(procfsMountPath).WithReadOnly(true) - workerContainer.WithVolumeMounts(procfsVolumeMount) - - // We need access to /sys in order to install certain eBPF tracepoints - // - sysfsVolume := applyconfcore.Volume() - sysfsVolume.WithName(sysfsVolumeName).WithHostPath(applyconfcore.HostPathVolumeSource().WithPath("/sys")) - sysfsVolumeMount := applyconfcore.VolumeMount().WithName(sysfsVolumeName).WithMountPath(sysfsMountPath).WithReadOnly(true) - workerContainer.WithVolumeMounts(sysfsVolumeMount) - - podSpec := applyconfcore.PodSpec() - podSpec.WithHostNetwork(true) - podSpec.WithDNSPolicy(core.DNSClusterFirstWithHostNet) - podSpec.WithTerminationGracePeriodSeconds(0) - if serviceAccountName != "" { - podSpec.WithServiceAccountName(serviceAccountName) - } - podSpec.WithContainers(workerContainer) - podSpec.WithAffinity(affinity) - podSpec.WithTolerations(tolerations...) - podSpec.WithVolumes(procfsVolume, sysfsVolume) - - if len(imagePullSecrets) > 0 { - localObjectReference := applyconfcore.LocalObjectReference() - for _, secret := range imagePullSecrets { - localObjectReference.WithName(secret.Name) - } - podSpec.WithImagePullSecrets(localObjectReference) - } - - podTemplate := applyconfcore.PodTemplateSpec() - podTemplate.WithLabels(buildWithDefaultLabels(map[string]string{ - "app": workerPodName, - }, provider)) - podTemplate.WithSpec(podSpec) - - labelSelector := applyconfmeta.LabelSelector() - labelSelector.WithMatchLabels(map[string]string{"app": workerPodName}) applyOptions := metav1.ApplyOptions{ Force: true, FieldManager: fieldManagerName, } - daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace) - daemonSet. - WithLabels(buildWithDefaultLabels(map[string]string{}, provider)). - WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate)) - - _, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions) - return err -} - -func (provider *Provider) ResetWorkerDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, workerPodName string) error { - workerContainer := applyconfcore.Container() - workerContainer.WithName(workerPodName) - workerContainer.WithImage(podImage) - - nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement() - nodeSelectorRequirement.WithKey(fmt.Sprintf("%s-non-existing-label", misc.Program)) - nodeSelectorRequirement.WithOperator(core.NodeSelectorOpExists) - nodeSelectorTerm := applyconfcore.NodeSelectorTerm() - nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement) - nodeSelector := applyconfcore.NodeSelector() - nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm) - nodeAffinity := applyconfcore.NodeAffinity() - nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector) - affinity := applyconfcore.Affinity() - affinity.WithNodeAffinity(nodeAffinity) - - podSpec := applyconfcore.PodSpec() - podSpec.WithContainers(workerContainer) - podSpec.WithAffinity(affinity) - - podTemplate := applyconfcore.PodTemplateSpec() - podTemplate.WithLabels(buildWithDefaultLabels(map[string]string{ - "app": workerPodName, - }, provider)) - podTemplate.WithSpec(podSpec) - - labelSelector := applyconfmeta.LabelSelector() - labelSelector.WithMatchLabels(map[string]string{"app": workerPodName}) - - applyOptions := metav1.ApplyOptions{ - Force: true, - FieldManager: fieldManagerName, - } - - daemonSet := applyconfapp.DaemonSet(daemonSetName, namespace) - daemonSet. - WithLabels(buildWithDefaultLabels(map[string]string{}, provider)). - WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate)) - - _, err := provider.clientSet.AppsV1().DaemonSets(namespace).Apply(ctx, daemonSet, applyOptions) + _, err = provider.clientSet.AppsV1().DaemonSets(namespace).Apply( + ctx, + daemonSet.GenerateApplyConfiguration( + daemonSetName, + namespace, + podName, + provider, + ), + applyOptions, + ) return err } diff --git a/kubernetes/types.go b/kubernetes/types.go new file mode 100644 index 000000000..14b9cd6c5 --- /dev/null +++ b/kubernetes/types.go @@ -0,0 +1,137 @@ +package kubernetes + +import ( + core "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1" + applyconfcore "k8s.io/client-go/applyconfigurations/core/v1" + v1 "k8s.io/client-go/applyconfigurations/core/v1" + applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" +) + +type DaemonSetSpec struct { + Selector metav1.LabelSelector `json:"selector,omitempty" protobuf:"bytes,1,opt,name=selector"` + Template core.Pod `json:"template,omitempty" protobuf:"bytes,2,opt,name=template"` +} + +type DaemonSet struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Spec DaemonSetSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"` +} + +func (d *DaemonSet) GenerateApplyConfiguration(name string, namespace string, podName string, provider *Provider) *applyconfapp.DaemonSetApplyConfiguration { + // Pod + p := d.Spec.Template.Spec + podSpec := applyconfcore.PodSpec() + podSpec.WithHostNetwork(p.HostNetwork) + podSpec.WithDNSPolicy(p.DNSPolicy) + podSpec.WithTerminationGracePeriodSeconds(*p.TerminationGracePeriodSeconds) + podSpec.WithServiceAccountName(p.ServiceAccountName) + + // Containers + for _, c := range d.Spec.Template.Spec.Containers { + // Common + container := applyconfcore.Container() + container.WithName(c.Name) + container.WithImage(c.Image) + container.WithImagePullPolicy(c.ImagePullPolicy) + container.WithCommand(c.Command...) + + // Linux capabilities + caps := applyconfcore.Capabilities().WithAdd(c.SecurityContext.Capabilities.Add...).WithDrop(c.SecurityContext.Capabilities.Drop...) + container.WithSecurityContext(applyconfcore.SecurityContext().WithCapabilities(caps)) + + // Environment variables + var envvars []*v1.EnvVarApplyConfiguration + for _, e := range c.Env { + envvars = append(envvars, applyconfcore.EnvVar().WithName(e.Name).WithValue(e.Value)) + } + container.WithEnv(envvars...) + + // Resource limits + resources := applyconfcore.ResourceRequirements().WithRequests(c.Resources.Requests).WithLimits(c.Resources.Limits) + container.WithResources(resources) + + // Volume mounts + for _, m := range c.VolumeMounts { + volumeMount := applyconfcore.VolumeMount().WithName(m.Name).WithMountPath(m.MountPath).WithReadOnly(m.ReadOnly) + container.WithVolumeMounts(volumeMount) + } + + podSpec.WithContainers(container) + } + + // Node affinity (RequiredDuringSchedulingIgnoredDuringExecution only) + if p.Affinity != nil { + nodeSelector := applyconfcore.NodeSelector() + for _, term := range p.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms { + nodeSelectorTerm := applyconfcore.NodeSelectorTerm() + for _, selector := range term.MatchExpressions { + nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement() + nodeSelectorRequirement.WithKey(selector.Key) + nodeSelectorRequirement.WithOperator(selector.Operator) + nodeSelectorRequirement.WithValues(selector.Values...) + nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement) + } + for _, selector := range term.MatchFields { + nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement() + nodeSelectorRequirement.WithKey(selector.Key) + nodeSelectorRequirement.WithOperator(selector.Operator) + nodeSelectorRequirement.WithValues(selector.Values...) + nodeSelectorTerm.WithMatchFields(nodeSelectorRequirement) + } + nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm) + } + nodeAffinity := applyconfcore.NodeAffinity() + nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector) + affinity := applyconfcore.Affinity() + affinity.WithNodeAffinity(nodeAffinity) + podSpec.WithAffinity(affinity) + } + + // Tolerations + for _, t := range p.Tolerations { + toleration := applyconfcore.Toleration() + toleration.WithKey(t.Key) + toleration.WithOperator(t.Operator) + toleration.WithValue(t.Value) + toleration.WithEffect(t.Effect) + if t.TolerationSeconds != nil { + toleration.WithTolerationSeconds(*t.TolerationSeconds) + } + podSpec.WithTolerations(toleration) + } + + // Volumes + for _, v := range p.Volumes { + volume := applyconfcore.Volume() + volume.WithName(v.Name).WithHostPath(applyconfcore.HostPathVolumeSource().WithPath(v.HostPath.Path)) + podSpec.WithVolumes(volume) + } + + // Image pull secrets + if len(p.ImagePullSecrets) > 0 { + localObjectReference := applyconfcore.LocalObjectReference() + for _, o := range p.ImagePullSecrets { + localObjectReference.WithName(o.Name) + } + podSpec.WithImagePullSecrets(localObjectReference) + } + + podTemplate := applyconfcore.PodTemplateSpec() + podTemplate.WithLabels(buildWithDefaultLabels(map[string]string{ + "app": podName, + }, provider)) + podTemplate.WithSpec(podSpec) + + labelSelector := applyconfmeta.LabelSelector() + labelSelector.WithMatchLabels(map[string]string{"app": podName}) + + daemonSet := applyconfapp.DaemonSet(name, namespace) + daemonSet. + WithLabels(buildWithDefaultLabels(map[string]string{}, provider)). + WithSpec(applyconfapp.DaemonSetSpec().WithSelector(labelSelector).WithTemplate(podTemplate)) + + return daemonSet +}