mirror of
https://github.com/woodpecker-ci/woodpecker.git
synced 2025-10-22 07:42:23 +00:00
Move cncd/pipeline/pipeline/ to pipeline/ (#347)
* Refactor: move cncd/pipeline/ to pipeline/ * Refactor: move pipeline/pipeline/ to pipeline/
This commit is contained in:
178
pipeline/backend/docker/convert.go
Normal file
178
pipeline/backend/docker/convert.go
Normal file
@@ -0,0 +1,178 @@
|
||||
package docker
|
||||
|
||||
import (
|
||||
"docker.io/go-docker/api/types/container"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/pipeline/backend"
|
||||
)
|
||||
|
||||
// returns a container configuration.
|
||||
func toConfig(proc *backend.Step) *container.Config {
|
||||
config := &container.Config{
|
||||
Image: proc.Image,
|
||||
Labels: proc.Labels,
|
||||
WorkingDir: proc.WorkingDir,
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
}
|
||||
if len(proc.Environment) != 0 {
|
||||
config.Env = toEnv(proc.Environment)
|
||||
}
|
||||
if len(proc.Command) != 0 {
|
||||
config.Cmd = proc.Command
|
||||
}
|
||||
if len(proc.Entrypoint) != 0 {
|
||||
config.Entrypoint = proc.Entrypoint
|
||||
}
|
||||
if len(proc.Volumes) != 0 {
|
||||
config.Volumes = toVol(proc.Volumes)
|
||||
}
|
||||
return config
|
||||
}
|
||||
|
||||
// returns a container host configuration.
|
||||
func toHostConfig(proc *backend.Step) *container.HostConfig {
|
||||
config := &container.HostConfig{
|
||||
Resources: container.Resources{
|
||||
CPUQuota: proc.CPUQuota,
|
||||
CPUShares: proc.CPUShares,
|
||||
CpusetCpus: proc.CPUSet,
|
||||
Memory: proc.MemLimit,
|
||||
MemorySwap: proc.MemSwapLimit,
|
||||
},
|
||||
LogConfig: container.LogConfig{
|
||||
Type: "json-file",
|
||||
},
|
||||
Privileged: proc.Privileged,
|
||||
ShmSize: proc.ShmSize,
|
||||
Sysctls: proc.Sysctls,
|
||||
}
|
||||
|
||||
// if len(proc.VolumesFrom) != 0 {
|
||||
// config.VolumesFrom = proc.VolumesFrom
|
||||
// }
|
||||
if len(proc.NetworkMode) != 0 {
|
||||
config.NetworkMode = container.NetworkMode(proc.NetworkMode)
|
||||
}
|
||||
if len(proc.IpcMode) != 0 {
|
||||
config.IpcMode = container.IpcMode(proc.IpcMode)
|
||||
}
|
||||
if len(proc.DNS) != 0 {
|
||||
config.DNS = proc.DNS
|
||||
}
|
||||
if len(proc.DNSSearch) != 0 {
|
||||
config.DNSSearch = proc.DNSSearch
|
||||
}
|
||||
if len(proc.ExtraHosts) != 0 {
|
||||
config.ExtraHosts = proc.ExtraHosts
|
||||
}
|
||||
if len(proc.Devices) != 0 {
|
||||
config.Devices = toDev(proc.Devices)
|
||||
}
|
||||
if len(proc.Volumes) != 0 {
|
||||
config.Binds = proc.Volumes
|
||||
}
|
||||
config.Tmpfs = map[string]string{}
|
||||
for _, path := range proc.Tmpfs {
|
||||
if strings.Index(path, ":") == -1 {
|
||||
config.Tmpfs[path] = ""
|
||||
continue
|
||||
}
|
||||
parts, err := splitVolumeParts(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
config.Tmpfs[parts[0]] = parts[1]
|
||||
}
|
||||
// if proc.OomKillDisable {
|
||||
// config.OomKillDisable = &proc.OomKillDisable
|
||||
// }
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
// helper function that converts a slice of volume paths to a set of
|
||||
// unique volume names.
|
||||
func toVol(paths []string) map[string]struct{} {
|
||||
set := map[string]struct{}{}
|
||||
for _, path := range paths {
|
||||
parts, err := splitVolumeParts(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if len(parts) < 2 {
|
||||
continue
|
||||
}
|
||||
set[parts[1]] = struct{}{}
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
||||
// helper function that converts a key value map of environment variables to a
|
||||
// string slice in key=value format.
|
||||
func toEnv(env map[string]string) []string {
|
||||
var envs []string
|
||||
for k, v := range env {
|
||||
envs = append(envs, k+"="+v)
|
||||
}
|
||||
return envs
|
||||
}
|
||||
|
||||
// helper function that converts a slice of device paths to a slice of
|
||||
// container.DeviceMapping.
|
||||
func toDev(paths []string) []container.DeviceMapping {
|
||||
var devices []container.DeviceMapping
|
||||
for _, path := range paths {
|
||||
parts, err := splitVolumeParts(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if len(parts) < 2 {
|
||||
continue
|
||||
}
|
||||
if strings.HasSuffix(parts[1], ":ro") || strings.HasSuffix(parts[1], ":rw") {
|
||||
parts[1] = parts[1][:len(parts[1])-1]
|
||||
}
|
||||
devices = append(devices, container.DeviceMapping{
|
||||
PathOnHost: parts[0],
|
||||
PathInContainer: parts[1],
|
||||
CgroupPermissions: "rwm",
|
||||
})
|
||||
}
|
||||
return devices
|
||||
}
|
||||
|
||||
// helper function that serializes the auth configuration as JSON
|
||||
// base64 payload.
|
||||
func encodeAuthToBase64(authConfig backend.Auth) (string, error) {
|
||||
buf, err := json.Marshal(authConfig)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return base64.URLEncoding.EncodeToString(buf), nil
|
||||
}
|
||||
|
||||
// helper function that split volume path
|
||||
func splitVolumeParts(volumeParts string) ([]string, error) {
|
||||
pattern := `^((?:[\w]\:)?[^\:]*)\:((?:[\w]\:)?[^\:]*)(?:\:([rwom]*))?`
|
||||
r, err := regexp.Compile(pattern)
|
||||
if err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
if r.MatchString(volumeParts) {
|
||||
results := r.FindStringSubmatch(volumeParts)[1:]
|
||||
cleanResults := []string{}
|
||||
for _, item := range results {
|
||||
if item != "" {
|
||||
cleanResults = append(cleanResults, item)
|
||||
}
|
||||
}
|
||||
return cleanResults, nil
|
||||
} else {
|
||||
return strings.Split(volumeParts, ":"), nil
|
||||
}
|
||||
}
|
74
pipeline/backend/docker/convert_test.go
Normal file
74
pipeline/backend/docker/convert_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package docker
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSplitVolumeParts(t *testing.T) {
|
||||
testdata := []struct {
|
||||
from string
|
||||
to []string
|
||||
success bool
|
||||
}{
|
||||
{
|
||||
from: `Z::Z::rw`,
|
||||
to: []string{`Z:`, `Z:`, `rw`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `Z:\:Z:\:rw`,
|
||||
to: []string{`Z:\`, `Z:\`, `rw`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `Z:\git\refs:Z:\git\refs:rw`,
|
||||
to: []string{`Z:\git\refs`, `Z:\git\refs`, `rw`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `Z:\git\refs:Z:\git\refs`,
|
||||
to: []string{`Z:\git\refs`, `Z:\git\refs`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `Z:/:Z:/:rw`,
|
||||
to: []string{`Z:/`, `Z:/`, `rw`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `Z:/git/refs:Z:/git/refs:rw`,
|
||||
to: []string{`Z:/git/refs`, `Z:/git/refs`, `rw`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `Z:/git/refs:Z:/git/refs`,
|
||||
to: []string{`Z:/git/refs`, `Z:/git/refs`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `/test:/test`,
|
||||
to: []string{`/test`, `/test`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `test:/test`,
|
||||
to: []string{`test`, `/test`},
|
||||
success: true,
|
||||
},
|
||||
{
|
||||
from: `test:test`,
|
||||
to: []string{`test`, `test`},
|
||||
success: true,
|
||||
},
|
||||
}
|
||||
for _, test := range testdata {
|
||||
results, err := splitVolumeParts(test.from)
|
||||
if test.success == (err != nil) {
|
||||
} else {
|
||||
if reflect.DeepEqual(results, test.to) != test.success {
|
||||
t.Errorf("Expect %q matches %q is %v", test.from, results, test.to)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
208
pipeline/backend/docker/docker.go
Normal file
208
pipeline/backend/docker/docker.go
Normal file
@@ -0,0 +1,208 @@
|
||||
package docker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/docker/docker/pkg/jsonmessage"
|
||||
"github.com/docker/docker/pkg/stdcopy"
|
||||
"github.com/docker/docker/pkg/term"
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/woodpecker-ci/woodpecker/pipeline/backend"
|
||||
|
||||
"docker.io/go-docker"
|
||||
"docker.io/go-docker/api/types"
|
||||
"docker.io/go-docker/api/types/network"
|
||||
"docker.io/go-docker/api/types/volume"
|
||||
)
|
||||
|
||||
type engine struct {
|
||||
client docker.APIClient
|
||||
}
|
||||
|
||||
// New returns a new Docker Engine using the given client.
|
||||
func New(cli docker.APIClient) backend.Engine {
|
||||
return &engine{
|
||||
client: cli,
|
||||
}
|
||||
}
|
||||
|
||||
// NewEnv returns a new Docker Engine using the client connection
|
||||
// environment variables.
|
||||
func NewEnv() (backend.Engine, error) {
|
||||
cli, err := docker.NewEnvClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return New(cli), nil
|
||||
}
|
||||
|
||||
func (e *engine) Setup(_ context.Context, conf *backend.Config) error {
|
||||
for _, vol := range conf.Volumes {
|
||||
_, err := e.client.VolumeCreate(noContext, volume.VolumesCreateBody{
|
||||
Name: vol.Name,
|
||||
Driver: vol.Driver,
|
||||
DriverOpts: vol.DriverOpts,
|
||||
// Labels: defaultLabels,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, network := range conf.Networks {
|
||||
_, err := e.client.NetworkCreate(noContext, network.Name, types.NetworkCreate{
|
||||
Driver: network.Driver,
|
||||
Options: network.DriverOpts,
|
||||
// Labels: defaultLabels,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *engine) Exec(ctx context.Context, proc *backend.Step) error {
|
||||
config := toConfig(proc)
|
||||
hostConfig := toHostConfig(proc)
|
||||
|
||||
// create pull options with encoded authorization credentials.
|
||||
pullopts := types.ImagePullOptions{}
|
||||
if proc.AuthConfig.Username != "" && proc.AuthConfig.Password != "" {
|
||||
pullopts.RegistryAuth, _ = encodeAuthToBase64(proc.AuthConfig)
|
||||
}
|
||||
|
||||
// automatically pull the latest version of the image if requested
|
||||
// by the process configuration.
|
||||
if proc.Pull {
|
||||
responseBody, perr := e.client.ImagePull(ctx, config.Image, pullopts)
|
||||
if perr == nil {
|
||||
defer responseBody.Close()
|
||||
|
||||
fd, isTerminal := term.GetFdInfo(os.Stdout)
|
||||
jsonmessage.DisplayJSONMessagesStream(responseBody, os.Stdout, fd, isTerminal, nil)
|
||||
}
|
||||
// fix for drone/drone#1917
|
||||
if perr != nil && proc.AuthConfig.Password != "" {
|
||||
return perr
|
||||
}
|
||||
}
|
||||
|
||||
_, err := e.client.ContainerCreate(ctx, config, hostConfig, nil, proc.Name)
|
||||
if docker.IsErrImageNotFound(err) {
|
||||
// automatically pull and try to re-create the image if the
|
||||
// failure is caused because the image does not exist.
|
||||
responseBody, perr := e.client.ImagePull(ctx, config.Image, pullopts)
|
||||
if perr != nil {
|
||||
return perr
|
||||
}
|
||||
defer responseBody.Close()
|
||||
fd, isTerminal := term.GetFdInfo(os.Stdout)
|
||||
jsonmessage.DisplayJSONMessagesStream(responseBody, os.Stdout, fd, isTerminal, nil)
|
||||
|
||||
_, err = e.client.ContainerCreate(ctx, config, hostConfig, nil, proc.Name)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(proc.NetworkMode) == 0 {
|
||||
for _, net := range proc.Networks {
|
||||
err = e.client.NetworkConnect(ctx, net.Name, proc.Name, &network.EndpointSettings{
|
||||
Aliases: net.Aliases,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if proc.Network != "host" { // or bridge, overlay, none, internal, container:<name> ....
|
||||
// err = e.client.NetworkConnect(ctx, proc.Network, proc.Name, &network.EndpointSettings{
|
||||
// Aliases: proc.NetworkAliases,
|
||||
// })
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// }
|
||||
|
||||
return e.client.ContainerStart(ctx, proc.Name, startOpts)
|
||||
}
|
||||
|
||||
func (e *engine) Kill(_ context.Context, proc *backend.Step) error {
|
||||
return e.client.ContainerKill(noContext, proc.Name, "9")
|
||||
}
|
||||
|
||||
func (e *engine) Wait(ctx context.Context, proc *backend.Step) (*backend.State, error) {
|
||||
wait, errc := e.client.ContainerWait(ctx, proc.Name, "")
|
||||
select {
|
||||
case <-wait:
|
||||
case <-errc:
|
||||
}
|
||||
|
||||
info, err := e.client.ContainerInspect(ctx, proc.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if info.State.Running {
|
||||
// todo
|
||||
}
|
||||
|
||||
return &backend.State{
|
||||
Exited: true,
|
||||
ExitCode: info.State.ExitCode,
|
||||
OOMKilled: info.State.OOMKilled,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *engine) Tail(ctx context.Context, proc *backend.Step) (io.ReadCloser, error) {
|
||||
logs, err := e.client.ContainerLogs(ctx, proc.Name, logsOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rc, wc := io.Pipe()
|
||||
|
||||
go func() {
|
||||
stdcopy.StdCopy(wc, wc, logs)
|
||||
logs.Close()
|
||||
wc.Close()
|
||||
rc.Close()
|
||||
}()
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func (e *engine) Destroy(_ context.Context, conf *backend.Config) error {
|
||||
for _, stage := range conf.Stages {
|
||||
for _, step := range stage.Steps {
|
||||
e.client.ContainerKill(noContext, step.Name, "9")
|
||||
e.client.ContainerRemove(noContext, step.Name, removeOpts)
|
||||
}
|
||||
}
|
||||
for _, volume := range conf.Volumes {
|
||||
e.client.VolumeRemove(noContext, volume.Name, true)
|
||||
}
|
||||
for _, network := range conf.Networks {
|
||||
e.client.NetworkRemove(noContext, network.Name)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var (
|
||||
noContext = context.Background()
|
||||
|
||||
startOpts = types.ContainerStartOptions{}
|
||||
|
||||
removeOpts = types.ContainerRemoveOptions{
|
||||
RemoveVolumes: true,
|
||||
RemoveLinks: false,
|
||||
Force: false,
|
||||
}
|
||||
|
||||
logsOpts = types.ContainerLogsOptions{
|
||||
Follow: true,
|
||||
ShowStdout: true,
|
||||
ShowStderr: true,
|
||||
Details: false,
|
||||
Timestamps: false,
|
||||
}
|
||||
)
|
44
pipeline/backend/docker/pool.go
Normal file
44
pipeline/backend/docker/pool.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package docker
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
//
|
||||
// "github.com/woodpecker-ci/woodpecker/pipeline/backend"
|
||||
// )
|
||||
//
|
||||
// // Pool manages a pool of Docker clients.
|
||||
// type Pool struct {
|
||||
// queue chan (backend.Engine)
|
||||
// }
|
||||
//
|
||||
// // NewPool returns a Pool.
|
||||
// func NewPool(engines ...backend.Engine) *Pool {
|
||||
// return &Pool{
|
||||
// queue: make(chan backend.Engine, len(engines)),
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // Reserve requests the next available Docker client in the pool.
|
||||
// func (p *Pool) Reserve(c context.Context) backend.Engine {
|
||||
// select {
|
||||
// case <-c.Done():
|
||||
// case engine := <-p.queue:
|
||||
// return engine
|
||||
// }
|
||||
// return nil
|
||||
// }
|
||||
//
|
||||
// // Release releases the Docker client back to the pool.
|
||||
// func (p *Pool) Release(engine backend.Engine) {
|
||||
// p.queue <- engine
|
||||
// }
|
||||
|
||||
// pool := docker.Pool(
|
||||
// docker.FromEnvironmentMust(),
|
||||
// docker.FromEnvironmentMust(),
|
||||
// docker.FromEnvironmentMust(),
|
||||
// docker.FromEnvironmentMust(),
|
||||
// )
|
||||
//
|
||||
// client := pool.Reserve()
|
||||
// defer pool.Release(client)
|
Reference in New Issue
Block a user