feat(k8s): Kubernetes namespace per organization (#5309)

This commit is contained in:
Henrik Huitti
2025-07-22 17:22:26 +03:00
committed by GitHub
parent 5c9fc61619
commit 79e4dd5380
12 changed files with 229 additions and 29 deletions

View File

@@ -23,6 +23,8 @@ import (
"os"
"runtime"
"slices"
"strconv"
"strings"
"time"
"github.com/rs/zerolog/log"
@@ -56,6 +58,7 @@ type kube struct {
type config struct {
Namespace string
EnableNamespacePerOrg bool
StorageClass string
VolumeSize string
StorageRwx bool
@@ -68,6 +71,14 @@ type config struct {
SecurityContext SecurityContextConfig
NativeSecretsAllowFromStep bool
}
func (c *config) GetNamespace(orgID int64) string {
if c.EnableNamespacePerOrg {
return strings.ToLower(fmt.Sprintf("%s-%s", c.Namespace, strconv.FormatInt(orgID, 10)))
}
return c.Namespace
}
type SecurityContextConfig struct {
RunAsNonRoot bool
FSGroup *int64
@@ -88,6 +99,7 @@ func configFromCliContext(ctx context.Context) (*config, error) {
if c, ok := ctx.Value(types.CliCommand).(*cli.Command); ok {
config := config{
Namespace: c.String("backend-k8s-namespace"),
EnableNamespacePerOrg: c.Bool("backend-k8s-namespace-per-org"),
StorageClass: c.String("backend-k8s-storage-class"),
VolumeSize: c.String("backend-k8s-volume-size"),
StorageRwx: c.Bool("backend-k8s-storage-rwx"),
@@ -191,7 +203,16 @@ func (e *kube) getConfig() *config {
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
_, err := startVolume(ctx, e, conf.Volume)
namespace := e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID)
if e.config.EnableNamespacePerOrg {
err := mkNamespace(ctx, e.client.CoreV1().Namespaces(), namespace)
if err != nil {
return err
}
}
_, err := startVolume(ctx, e, conf.Volume, namespace)
if err != nil {
return err
}
@@ -276,7 +297,7 @@ func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string)
}
}
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.Namespace))
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.GetNamespace(step.OrgID)))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
@@ -292,7 +313,7 @@ func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string)
// TODO: Cancel on ctx.Done
<-finished
pod, err := e.client.CoreV1().Pods(e.config.Namespace).Get(ctx, podName, meta_v1.GetOptions{})
pod, err := e.client.CoreV1().Pods(e.config.GetNamespace(step.OrgID)).Get(ctx, podName, meta_v1.GetOptions{})
if err != nil {
return nil, err
}
@@ -351,7 +372,7 @@ func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string)
}
}
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.Namespace))
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.GetNamespace(step.OrgID)))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
@@ -372,7 +393,7 @@ func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string)
}
logs, err := e.client.CoreV1().RESTClient().Get().
Namespace(e.config.Namespace).
Namespace(e.config.GetNamespace(step.OrgID)).
Name(podName).
Resource("pods").
SubResource("log").
@@ -439,7 +460,7 @@ func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID
}
}
err := stopVolume(ctx, e, conf.Volume, defaultDeleteOptions)
err := stopVolume(ctx, e, conf.Volume, e.config.GetNamespace(conf.Stages[0].Steps[0].OrgID), defaultDeleteOptions)
if err != nil {
return err
}