mirror of
https://github.com/distribution/distribution.git
synced 2025-09-25 06:11:25 +00:00
Remove SWIFT storage driver
This commit removes swift storage driver from distribution. There are several reasons for it: * no real life expertise among the maintainers * swift is compatible with S3 API operations required by S3 storage driver This will also remove depedencies that are also hard to keep up with. Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
This commit is contained in:
@@ -1,932 +0,0 @@
|
||||
// Package swift provides a storagedriver.StorageDriver implementation to
|
||||
// store blobs in Openstack Swift object storage.
|
||||
//
|
||||
// This package leverages the ncw/swift client library for interfacing with
|
||||
// Swift.
|
||||
//
|
||||
// It supports both TempAuth authentication and Keystone authentication
|
||||
// (up to version 3).
|
||||
//
|
||||
// As Swift has a limit on the size of a single uploaded object (by default
|
||||
// this is 5GB), the driver makes use of the Swift Large Object Support
|
||||
// (http://docs.openstack.org/developer/swift/overview_large_objects.html).
|
||||
// Only one container is used for both manifests and data objects. Manifests
|
||||
// are stored in the 'files' pseudo directory, data objects are stored under
|
||||
// 'segments'.
|
||||
package swift
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha1"
|
||||
"crypto/tls"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
"github.com/ncw/swift"
|
||||
|
||||
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
||||
"github.com/distribution/distribution/v3/registry/storage/driver/base"
|
||||
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
|
||||
"github.com/distribution/distribution/v3/version"
|
||||
)
|
||||
|
||||
const driverName = "swift"
|
||||
|
||||
// defaultChunkSize defines the default size of a segment
|
||||
const defaultChunkSize = 20 * 1024 * 1024
|
||||
|
||||
// minChunkSize defines the minimum size of a segment
|
||||
const minChunkSize = 1 << 20
|
||||
|
||||
// contentType defines the Content-Type header associated with stored segments
|
||||
const contentType = "application/octet-stream"
|
||||
|
||||
// readAfterWriteTimeout defines the time we wait before an object appears after having been uploaded
|
||||
var readAfterWriteTimeout = 15 * time.Second
|
||||
|
||||
// readAfterWriteWait defines the time to sleep between two retries
|
||||
var readAfterWriteWait = 200 * time.Millisecond
|
||||
|
||||
// Parameters A struct that encapsulates all of the driver parameters after all values have been set
|
||||
type Parameters struct {
|
||||
Username string
|
||||
Password string
|
||||
AuthURL string
|
||||
Tenant string
|
||||
TenantID string
|
||||
Domain string
|
||||
DomainID string
|
||||
TenantDomain string
|
||||
TenantDomainID string
|
||||
TrustID string
|
||||
Region string
|
||||
AuthVersion int
|
||||
Container string
|
||||
Prefix string
|
||||
EndpointType string
|
||||
InsecureSkipVerify bool
|
||||
ChunkSize int
|
||||
SecretKey string
|
||||
AccessKey string
|
||||
TempURLContainerKey bool
|
||||
TempURLMethods []string
|
||||
}
|
||||
|
||||
// swiftInfo maps the JSON structure returned by Swift /info endpoint
|
||||
type swiftInfo struct {
|
||||
Swift struct {
|
||||
Version string `mapstructure:"version"`
|
||||
}
|
||||
Tempurl struct {
|
||||
Methods []string `mapstructure:"methods"`
|
||||
}
|
||||
BulkDelete struct {
|
||||
MaxDeletesPerRequest int `mapstructure:"max_deletes_per_request"`
|
||||
} `mapstructure:"bulk_delete"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
factory.Register(driverName, &swiftDriverFactory{})
|
||||
}
|
||||
|
||||
// swiftDriverFactory implements the factory.StorageDriverFactory interface
|
||||
type swiftDriverFactory struct{}
|
||||
|
||||
func (factory *swiftDriverFactory) Create(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
|
||||
return FromParameters(parameters)
|
||||
}
|
||||
|
||||
type driver struct {
|
||||
Conn *swift.Connection
|
||||
Container string
|
||||
Prefix string
|
||||
BulkDeleteSupport bool
|
||||
BulkDeleteMaxDeletes int
|
||||
ChunkSize int
|
||||
SecretKey string
|
||||
AccessKey string
|
||||
TempURLContainerKey bool
|
||||
TempURLMethods []string
|
||||
}
|
||||
|
||||
type baseEmbed struct {
|
||||
base.Base
|
||||
}
|
||||
|
||||
// Driver is a storagedriver.StorageDriver implementation backed by Openstack Swift
|
||||
// Objects are stored at absolute keys in the provided container.
|
||||
type Driver struct {
|
||||
baseEmbed
|
||||
}
|
||||
|
||||
// FromParameters constructs a new Driver with a given parameters map
|
||||
// Required parameters:
|
||||
// - username
|
||||
// - password
|
||||
// - authurl
|
||||
// - container
|
||||
func FromParameters(parameters map[string]interface{}) (*Driver, error) {
|
||||
params := Parameters{
|
||||
ChunkSize: defaultChunkSize,
|
||||
InsecureSkipVerify: false,
|
||||
}
|
||||
|
||||
// Sanitize some entries before trying to decode parameters with mapstructure
|
||||
// TenantID and Tenant when integers only and passed as ENV variables
|
||||
// are considered as integer and not string. The parser fails in this
|
||||
// case.
|
||||
_, ok := parameters["tenant"]
|
||||
if ok {
|
||||
parameters["tenant"] = fmt.Sprint(parameters["tenant"])
|
||||
}
|
||||
_, ok = parameters["tenantid"]
|
||||
if ok {
|
||||
parameters["tenantid"] = fmt.Sprint(parameters["tenantid"])
|
||||
}
|
||||
|
||||
if err := mapstructure.Decode(parameters, ¶ms); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if params.Username == "" {
|
||||
return nil, fmt.Errorf("no username parameter provided")
|
||||
}
|
||||
|
||||
if params.Password == "" {
|
||||
return nil, fmt.Errorf("no password parameter provided")
|
||||
}
|
||||
|
||||
if params.AuthURL == "" {
|
||||
return nil, fmt.Errorf("no authurl parameter provided")
|
||||
}
|
||||
|
||||
if params.Container == "" {
|
||||
return nil, fmt.Errorf("no container parameter provided")
|
||||
}
|
||||
|
||||
if params.ChunkSize < minChunkSize {
|
||||
return nil, fmt.Errorf("the chunksize %#v parameter should be a number that is larger than or equal to %d", params.ChunkSize, minChunkSize)
|
||||
}
|
||||
|
||||
return New(params)
|
||||
}
|
||||
|
||||
// New constructs a new Driver with the given Openstack Swift credentials and container name
|
||||
func New(params Parameters) (*Driver, error) {
|
||||
transport := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
MaxIdleConnsPerHost: 2048,
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: params.InsecureSkipVerify},
|
||||
}
|
||||
|
||||
ct := &swift.Connection{
|
||||
UserName: params.Username,
|
||||
ApiKey: params.Password,
|
||||
AuthUrl: params.AuthURL,
|
||||
Region: params.Region,
|
||||
AuthVersion: params.AuthVersion,
|
||||
UserAgent: "distribution/" + version.Version,
|
||||
Tenant: params.Tenant,
|
||||
TenantId: params.TenantID,
|
||||
Domain: params.Domain,
|
||||
DomainId: params.DomainID,
|
||||
TenantDomain: params.TenantDomain,
|
||||
TenantDomainId: params.TenantDomainID,
|
||||
TrustId: params.TrustID,
|
||||
EndpointType: swift.EndpointType(params.EndpointType),
|
||||
Transport: transport,
|
||||
ConnectTimeout: 60 * time.Second,
|
||||
Timeout: 15 * 60 * time.Second,
|
||||
}
|
||||
err := ct.Authenticate()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("swift authentication failed: %s", err)
|
||||
}
|
||||
|
||||
if _, _, err := ct.Container(params.Container); err == swift.ContainerNotFound {
|
||||
if err := ct.ContainerCreate(params.Container, nil); err != nil {
|
||||
return nil, fmt.Errorf("failed to create container %s (%s)", params.Container, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve info about container %s (%s)", params.Container, err)
|
||||
}
|
||||
|
||||
d := &driver{
|
||||
Conn: ct,
|
||||
Container: params.Container,
|
||||
Prefix: params.Prefix,
|
||||
ChunkSize: params.ChunkSize,
|
||||
TempURLMethods: make([]string, 0),
|
||||
AccessKey: params.AccessKey,
|
||||
}
|
||||
|
||||
info := swiftInfo{}
|
||||
if config, err := d.Conn.QueryInfo(); err == nil {
|
||||
_, d.BulkDeleteSupport = config["bulk_delete"]
|
||||
|
||||
if err := mapstructure.Decode(config, &info); err == nil {
|
||||
d.TempURLContainerKey = info.Swift.Version >= "2.3.0"
|
||||
d.TempURLMethods = info.Tempurl.Methods
|
||||
if d.BulkDeleteSupport {
|
||||
d.BulkDeleteMaxDeletes = info.BulkDelete.MaxDeletesPerRequest
|
||||
}
|
||||
}
|
||||
} else {
|
||||
d.TempURLContainerKey = params.TempURLContainerKey
|
||||
d.TempURLMethods = params.TempURLMethods
|
||||
}
|
||||
|
||||
if len(d.TempURLMethods) > 0 {
|
||||
secretKey := params.SecretKey
|
||||
if secretKey == "" {
|
||||
secretKey, _ = generateSecret()
|
||||
}
|
||||
|
||||
// Since Swift 2.2.2, we can now set secret keys on containers
|
||||
// in addition to the account secret keys. Use them in preference.
|
||||
if d.TempURLContainerKey {
|
||||
_, containerHeaders, err := d.Conn.Container(d.Container)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch container info %s (%s)", d.Container, err)
|
||||
}
|
||||
|
||||
d.SecretKey = containerHeaders["X-Container-Meta-Temp-Url-Key"]
|
||||
if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
|
||||
m := swift.Metadata{}
|
||||
m["temp-url-key"] = secretKey
|
||||
if d.Conn.ContainerUpdate(d.Container, m.ContainerHeaders()); err == nil {
|
||||
d.SecretKey = secretKey
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Use the account secret key
|
||||
_, accountHeaders, err := d.Conn.Account()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch account info (%s)", err)
|
||||
}
|
||||
|
||||
d.SecretKey = accountHeaders["X-Account-Meta-Temp-Url-Key"]
|
||||
if d.SecretKey == "" || (params.SecretKey != "" && d.SecretKey != params.SecretKey) {
|
||||
m := swift.Metadata{}
|
||||
m["temp-url-key"] = secretKey
|
||||
if err := d.Conn.AccountUpdate(m.AccountHeaders()); err == nil {
|
||||
d.SecretKey = secretKey
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &Driver{
|
||||
baseEmbed: baseEmbed{
|
||||
Base: base.Base{
|
||||
StorageDriver: d,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Implement the storagedriver.StorageDriver interface
|
||||
|
||||
func (d *driver) Name() string {
|
||||
return driverName
|
||||
}
|
||||
|
||||
// GetContent retrieves the content stored at "path" as a []byte.
|
||||
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
||||
content, err := d.Conn.ObjectGetBytes(d.Container, d.swiftPath(path))
|
||||
if err == swift.ObjectNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return content, err
|
||||
}
|
||||
|
||||
// PutContent stores the []byte content at a location designated by "path".
|
||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||
err := d.Conn.ObjectPutBytes(d.Container, d.swiftPath(path), contents, contentType)
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
||||
// given byte offset.
|
||||
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
headers := make(swift.Headers)
|
||||
headers["Range"] = "bytes=" + strconv.FormatInt(offset, 10) + "-"
|
||||
|
||||
waitingTime := readAfterWriteWait
|
||||
endTime := time.Now().Add(readAfterWriteTimeout)
|
||||
|
||||
for {
|
||||
file, headers, err := d.Conn.ObjectOpen(d.Container, d.swiftPath(path), false, headers)
|
||||
if err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
if swiftErr, ok := err.(*swift.Error); ok && swiftErr.StatusCode == http.StatusRequestedRangeNotSatisfiable {
|
||||
return io.NopCloser(bytes.NewReader(nil)), nil
|
||||
}
|
||||
return file, err
|
||||
}
|
||||
|
||||
// if this is a DLO and it is clear that segments are still missing,
|
||||
// wait until they show up
|
||||
_, isDLO := headers["X-Object-Manifest"]
|
||||
size, err := file.Length()
|
||||
if err != nil {
|
||||
return file, err
|
||||
}
|
||||
if isDLO && size == 0 {
|
||||
if time.Now().Add(waitingTime).After(endTime) {
|
||||
return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
|
||||
}
|
||||
time.Sleep(waitingTime)
|
||||
waitingTime *= 2
|
||||
continue
|
||||
}
|
||||
|
||||
// if not, then this reader will be fine
|
||||
return file, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Writer returns a FileWriter which will store the content written to it
|
||||
// at the location designated by "path" after the call to Commit.
|
||||
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
|
||||
var (
|
||||
segments []swift.Object
|
||||
segmentsPath string
|
||||
err error
|
||||
)
|
||||
|
||||
if !append {
|
||||
segmentsPath, err = d.swiftSegmentPath(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
info, headers, err := d.Conn.Object(d.Container, d.swiftPath(path))
|
||||
if err == swift.ObjectNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
manifest, ok := headers["X-Object-Manifest"]
|
||||
if !ok {
|
||||
segmentsPath, err = d.swiftSegmentPath(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.Conn.ObjectMove(d.Container, d.swiftPath(path), d.Container, getSegmentPath(segmentsPath, len(segments))); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
segments = []swift.Object{info}
|
||||
} else {
|
||||
_, segmentsPath = parseManifest(manifest)
|
||||
if segments, err = d.getAllSegments(segmentsPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return d.newWriter(path, segmentsPath, segments), nil
|
||||
}
|
||||
|
||||
// Stat retrieves the FileInfo for the given path, including the current size
|
||||
// in bytes and the creation time.
|
||||
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
||||
swiftPath := d.swiftPath(path)
|
||||
opts := &swift.ObjectsOpts{
|
||||
Prefix: swiftPath,
|
||||
Delimiter: '/',
|
||||
}
|
||||
|
||||
objects, err := d.Conn.ObjectsAll(d.Container, opts)
|
||||
if err != nil {
|
||||
if err == swift.ContainerNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fi := storagedriver.FileInfoFields{
|
||||
Path: strings.TrimPrefix(strings.TrimSuffix(swiftPath, "/"), d.swiftPath("/")),
|
||||
}
|
||||
|
||||
for _, obj := range objects {
|
||||
if obj.PseudoDirectory && obj.Name == swiftPath+"/" {
|
||||
fi.IsDir = true
|
||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||
} else if obj.Name == swiftPath {
|
||||
// The file exists. But on Swift 1.12, the 'bytes' field is always 0 so
|
||||
// we need to do a separate HEAD request.
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Don't trust an empty `objects` slice. A container listing can be
|
||||
// outdated. For files, we can make a HEAD request on the object which
|
||||
// reports existence (at least) much more reliably.
|
||||
waitingTime := readAfterWriteWait
|
||||
endTime := time.Now().Add(readAfterWriteTimeout)
|
||||
|
||||
for {
|
||||
info, headers, err := d.Conn.Object(d.Container, swiftPath)
|
||||
if err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if this is a DLO and it is clear that segments are still missing,
|
||||
// wait until they show up
|
||||
_, isDLO := headers["X-Object-Manifest"]
|
||||
if isDLO && info.Bytes == 0 {
|
||||
if time.Now().Add(waitingTime).After(endTime) {
|
||||
return nil, fmt.Errorf("timeout expired while waiting for segments of %s to show up", path)
|
||||
}
|
||||
time.Sleep(waitingTime)
|
||||
waitingTime *= 2
|
||||
continue
|
||||
}
|
||||
|
||||
// otherwise, accept the result
|
||||
fi.IsDir = false
|
||||
fi.Size = info.Bytes
|
||||
fi.ModTime = info.LastModified
|
||||
return storagedriver.FileInfoInternal{FileInfoFields: fi}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// List returns a list of the objects that are direct descendants of the given path.
|
||||
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
|
||||
var files []string
|
||||
|
||||
prefix := d.swiftPath(path)
|
||||
if prefix != "" {
|
||||
prefix += "/"
|
||||
}
|
||||
|
||||
opts := &swift.ObjectsOpts{
|
||||
Prefix: prefix,
|
||||
Delimiter: '/',
|
||||
}
|
||||
|
||||
objects, err := d.Conn.ObjectsAll(d.Container, opts)
|
||||
for _, obj := range objects {
|
||||
files = append(files, strings.TrimPrefix(strings.TrimSuffix(obj.Name, "/"), d.swiftPath("/")))
|
||||
}
|
||||
|
||||
if err == swift.ContainerNotFound || (len(objects) == 0 && path != "/") {
|
||||
return files, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return files, err
|
||||
}
|
||||
|
||||
// Move moves an object stored at sourcePath to destPath, removing the original
|
||||
// object.
|
||||
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
|
||||
_, headers, err := d.Conn.Object(d.Container, d.swiftPath(sourcePath))
|
||||
if err == nil {
|
||||
if manifest, ok := headers["X-Object-Manifest"]; ok {
|
||||
if err = d.createManifest(destPath, manifest); err != nil {
|
||||
return err
|
||||
}
|
||||
err = d.Conn.ObjectDelete(d.Container, d.swiftPath(sourcePath))
|
||||
} else {
|
||||
err = d.Conn.ObjectMove(d.Container, d.swiftPath(sourcePath), d.Container, d.swiftPath(destPath))
|
||||
}
|
||||
}
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: sourcePath}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete recursively deletes all objects stored at "path" and its subpaths.
|
||||
func (d *driver) Delete(ctx context.Context, path string) error {
|
||||
opts := swift.ObjectsOpts{
|
||||
Prefix: d.swiftPath(path) + "/",
|
||||
}
|
||||
|
||||
objects, err := d.Conn.ObjectsAll(d.Container, &opts)
|
||||
if err != nil {
|
||||
if err == swift.ContainerNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for _, obj := range objects {
|
||||
if obj.PseudoDirectory {
|
||||
continue
|
||||
}
|
||||
if _, headers, err := d.Conn.Object(d.Container, obj.Name); err == nil {
|
||||
manifest, ok := headers["X-Object-Manifest"]
|
||||
if ok {
|
||||
_, prefix := parseManifest(manifest)
|
||||
segments, err := d.getAllSegments(prefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
objects = append(objects, segments...)
|
||||
}
|
||||
} else {
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: obj.Name}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if d.BulkDeleteSupport && len(objects) > 0 && d.BulkDeleteMaxDeletes > 0 {
|
||||
filenames := make([]string, len(objects))
|
||||
for i, obj := range objects {
|
||||
filenames[i] = obj.Name
|
||||
}
|
||||
|
||||
chunks, err := chunkFilenames(filenames, d.BulkDeleteMaxDeletes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, chunk := range chunks {
|
||||
_, err := d.Conn.BulkDelete(d.Container, chunk)
|
||||
// Don't fail on ObjectNotFound because eventual consistency
|
||||
// makes this situation normal.
|
||||
if err != nil && err != swift.Forbidden && err != swift.ObjectNotFound {
|
||||
if err == swift.ContainerNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for _, obj := range objects {
|
||||
if err := d.Conn.ObjectDelete(d.Container, obj.Name); err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: obj.Name}
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, _, err = d.Conn.Object(d.Container, d.swiftPath(path))
|
||||
if err == nil {
|
||||
if err := d.Conn.ObjectDelete(d.Container, d.swiftPath(path)); err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
} else if err == swift.ObjectNotFound {
|
||||
if len(objects) == 0 {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// URLFor returns a URL which may be used to retrieve the content stored at the given path.
|
||||
func (d *driver) URLFor(ctx context.Context, path string, options map[string]interface{}) (string, error) {
|
||||
if d.SecretKey == "" {
|
||||
return "", storagedriver.ErrUnsupportedMethod{}
|
||||
}
|
||||
|
||||
methodString := http.MethodGet
|
||||
method, ok := options["method"]
|
||||
if ok {
|
||||
if methodString, ok = method.(string); !ok {
|
||||
return "", storagedriver.ErrUnsupportedMethod{}
|
||||
}
|
||||
}
|
||||
|
||||
if methodString == http.MethodHead {
|
||||
// A "HEAD" request on a temporary URL is allowed if the
|
||||
// signature was generated with "GET", "POST" or "PUT"
|
||||
methodString = http.MethodGet
|
||||
}
|
||||
|
||||
supported := false
|
||||
for _, method := range d.TempURLMethods {
|
||||
if method == methodString {
|
||||
supported = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !supported {
|
||||
return "", storagedriver.ErrUnsupportedMethod{}
|
||||
}
|
||||
|
||||
expiresTime := time.Now().Add(20 * time.Minute)
|
||||
expires, ok := options["expiry"]
|
||||
if ok {
|
||||
et, ok := expires.(time.Time)
|
||||
if ok {
|
||||
expiresTime = et
|
||||
}
|
||||
}
|
||||
|
||||
tempURL := d.Conn.ObjectTempUrl(d.Container, d.swiftPath(path), d.SecretKey, methodString, expiresTime)
|
||||
|
||||
if d.AccessKey != "" {
|
||||
// On HP Cloud, the signature must be in the form of tenant_id:access_key:signature
|
||||
url, _ := url.Parse(tempURL)
|
||||
query := url.Query()
|
||||
query.Set("temp_url_sig", fmt.Sprintf("%s:%s:%s", d.Conn.TenantId, d.AccessKey, query.Get("temp_url_sig")))
|
||||
url.RawQuery = query.Encode()
|
||||
tempURL = url.String()
|
||||
}
|
||||
|
||||
return tempURL, nil
|
||||
}
|
||||
|
||||
// Walk traverses a filesystem defined within driver, starting
|
||||
// from the given path, calling f on each file and directory
|
||||
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn) error {
|
||||
return storagedriver.WalkFallback(ctx, d, path, f)
|
||||
}
|
||||
|
||||
func (d *driver) swiftPath(path string) string {
|
||||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/files"+path, "/"), "/")
|
||||
}
|
||||
|
||||
// swiftSegmentPath returns a randomly generated path in the segments directory.
|
||||
func (d *driver) swiftSegmentPath(path string) (string, error) {
|
||||
checksum := sha1.New()
|
||||
checksum.Write([]byte(path))
|
||||
|
||||
if _, err := io.CopyN(checksum, rand.Reader, 32); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
path = hex.EncodeToString(checksum.Sum(nil))
|
||||
return strings.TrimLeft(strings.TrimRight(d.Prefix+"/segments/"+path[0:3]+"/"+path[3:], "/"), "/"), nil
|
||||
}
|
||||
|
||||
func (d *driver) getAllSegments(path string) ([]swift.Object, error) {
|
||||
// a simple container listing works 99.9% of the time
|
||||
segments, err := d.Conn.ObjectsAll(d.Container, &swift.ObjectsOpts{Prefix: path})
|
||||
if err != nil {
|
||||
if err == swift.ContainerNotFound {
|
||||
return nil, storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// build a lookup table by object name
|
||||
hasObjectName := make(map[string]struct{})
|
||||
for _, segment := range segments {
|
||||
hasObjectName[segment.Name] = struct{}{}
|
||||
}
|
||||
|
||||
// The container listing might be outdated (i.e. not contain all existing
|
||||
// segment objects yet) because of temporary inconsistency (Swift is only
|
||||
// eventually consistent!). Check its completeness.
|
||||
segmentNumber := 0
|
||||
for {
|
||||
segmentNumber++
|
||||
segmentPath := getSegmentPath(path, segmentNumber)
|
||||
|
||||
if _, seen := hasObjectName[segmentPath]; seen {
|
||||
continue
|
||||
}
|
||||
|
||||
// This segment is missing in the container listing. Use a more reliable
|
||||
// request to check its existence. (HEAD requests on segments are
|
||||
// guaranteed to return the correct metadata, except for the pathological
|
||||
// case of an outage of large parts of the Swift cluster or its network,
|
||||
// since every segment is only written once.)
|
||||
segment, _, err := d.Conn.Object(d.Container, segmentPath)
|
||||
switch err {
|
||||
case nil:
|
||||
// found new segment -> keep going, more might be missing
|
||||
segments = append(segments, segment)
|
||||
continue
|
||||
case swift.ObjectNotFound:
|
||||
// This segment is missing. Since we upload segments sequentially,
|
||||
// there won't be any more segments after it.
|
||||
return segments, nil
|
||||
default:
|
||||
return nil, err // unexpected error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) createManifest(path string, segments string) error {
|
||||
headers := make(swift.Headers)
|
||||
headers["X-Object-Manifest"] = segments
|
||||
manifest, err := d.Conn.ObjectCreate(d.Container, d.swiftPath(path), false, "", contentType, headers)
|
||||
if err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
if err := manifest.Close(); err != nil {
|
||||
if err == swift.ObjectNotFound {
|
||||
return storagedriver.PathNotFoundError{Path: path}
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func chunkFilenames(slice []string, maxSize int) (chunks [][]string, err error) {
|
||||
if maxSize > 0 {
|
||||
for offset := 0; offset < len(slice); offset += maxSize {
|
||||
chunkSize := maxSize
|
||||
if offset+chunkSize > len(slice) {
|
||||
chunkSize = len(slice) - offset
|
||||
}
|
||||
chunks = append(chunks, slice[offset:offset+chunkSize])
|
||||
}
|
||||
} else {
|
||||
return nil, fmt.Errorf("max chunk size must be > 0")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func parseManifest(manifest string) (container string, prefix string) {
|
||||
container, prefix, _ = strings.Cut(manifest, "/")
|
||||
return container, prefix
|
||||
}
|
||||
|
||||
func generateSecret() (string, error) {
|
||||
var secretBytes [32]byte
|
||||
if _, err := rand.Read(secretBytes[:]); err != nil {
|
||||
return "", fmt.Errorf("could not generate random bytes for Swift secret key: %v", err)
|
||||
}
|
||||
return hex.EncodeToString(secretBytes[:]), nil
|
||||
}
|
||||
|
||||
func getSegmentPath(segmentsPath string, partNumber int) string {
|
||||
return fmt.Sprintf("%s/%016d", segmentsPath, partNumber)
|
||||
}
|
||||
|
||||
type writer struct {
|
||||
driver *driver
|
||||
path string
|
||||
segmentsPath string
|
||||
size int64
|
||||
bw *bufio.Writer
|
||||
closed bool
|
||||
committed bool
|
||||
cancelled bool
|
||||
}
|
||||
|
||||
func (d *driver) newWriter(path, segmentsPath string, segments []swift.Object) storagedriver.FileWriter {
|
||||
var size int64
|
||||
for _, segment := range segments {
|
||||
size += segment.Bytes
|
||||
}
|
||||
return &writer{
|
||||
driver: d,
|
||||
path: path,
|
||||
segmentsPath: segmentsPath,
|
||||
size: size,
|
||||
bw: bufio.NewWriterSize(&segmentWriter{
|
||||
conn: d.Conn,
|
||||
container: d.Container,
|
||||
segmentsPath: segmentsPath,
|
||||
segmentNumber: len(segments) + 1,
|
||||
maxChunkSize: d.ChunkSize,
|
||||
}, d.ChunkSize),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writer) Write(p []byte) (int, error) {
|
||||
if w.closed {
|
||||
return 0, fmt.Errorf("already closed")
|
||||
} else if w.committed {
|
||||
return 0, fmt.Errorf("already committed")
|
||||
} else if w.cancelled {
|
||||
return 0, fmt.Errorf("already cancelled")
|
||||
}
|
||||
|
||||
n, err := w.bw.Write(p)
|
||||
w.size += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (w *writer) Size() int64 {
|
||||
return w.size
|
||||
}
|
||||
|
||||
func (w *writer) Close() error {
|
||||
if w.closed {
|
||||
return fmt.Errorf("already closed")
|
||||
}
|
||||
|
||||
if err := w.bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !w.committed && !w.cancelled {
|
||||
if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := w.waitForSegmentsToShowUp(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
w.closed = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *writer) Cancel(ctx context.Context) error {
|
||||
if w.closed {
|
||||
return fmt.Errorf("already closed")
|
||||
} else if w.committed {
|
||||
return fmt.Errorf("already committed")
|
||||
}
|
||||
w.cancelled = true
|
||||
return w.driver.Delete(ctx, w.path)
|
||||
}
|
||||
|
||||
func (w *writer) Commit() error {
|
||||
if w.closed {
|
||||
return fmt.Errorf("already closed")
|
||||
} else if w.committed {
|
||||
return fmt.Errorf("already committed")
|
||||
} else if w.cancelled {
|
||||
return fmt.Errorf("already cancelled")
|
||||
}
|
||||
|
||||
if err := w.bw.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := w.driver.createManifest(w.path, w.driver.Container+"/"+w.segmentsPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.committed = true
|
||||
return w.waitForSegmentsToShowUp()
|
||||
}
|
||||
|
||||
func (w *writer) waitForSegmentsToShowUp() error {
|
||||
var err error
|
||||
waitingTime := readAfterWriteWait
|
||||
endTime := time.Now().Add(readAfterWriteTimeout)
|
||||
|
||||
for {
|
||||
var info swift.Object
|
||||
if info, _, err = w.driver.Conn.Object(w.driver.Container, w.driver.swiftPath(w.path)); err == nil {
|
||||
if info.Bytes == w.size {
|
||||
break
|
||||
}
|
||||
err = fmt.Errorf("timeout expired while waiting for segments of %s to show up", w.path)
|
||||
}
|
||||
if time.Now().Add(waitingTime).After(endTime) {
|
||||
break
|
||||
}
|
||||
time.Sleep(waitingTime)
|
||||
waitingTime *= 2
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
type segmentWriter struct {
|
||||
conn *swift.Connection
|
||||
container string
|
||||
segmentsPath string
|
||||
segmentNumber int
|
||||
maxChunkSize int
|
||||
}
|
||||
|
||||
func (sw *segmentWriter) Write(p []byte) (int, error) {
|
||||
n := 0
|
||||
for offset := 0; offset < len(p); offset += sw.maxChunkSize {
|
||||
chunkSize := sw.maxChunkSize
|
||||
if offset+chunkSize > len(p) {
|
||||
chunkSize = len(p) - offset
|
||||
}
|
||||
_, err := sw.conn.ObjectPut(sw.container, getSegmentPath(sw.segmentsPath, sw.segmentNumber), bytes.NewReader(p[offset:offset+chunkSize]), false, "", contentType, nil)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
|
||||
sw.segmentNumber++
|
||||
n += chunkSize
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
@@ -1,257 +0,0 @@
|
||||
package swift
|
||||
|
||||
import (
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/ncw/swift/swifttest"
|
||||
|
||||
"github.com/distribution/distribution/v3/context"
|
||||
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
|
||||
"github.com/distribution/distribution/v3/registry/storage/driver/testsuites"
|
||||
|
||||
"gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) { check.TestingT(t) }
|
||||
|
||||
var swiftDriverConstructor func(prefix string) (*Driver, error)
|
||||
|
||||
func init() {
|
||||
var (
|
||||
username = os.Getenv("SWIFT_USERNAME")
|
||||
password = os.Getenv("SWIFT_PASSWORD")
|
||||
authURL = os.Getenv("SWIFT_AUTH_URL")
|
||||
tenant = os.Getenv("SWIFT_TENANT_NAME")
|
||||
tenantID = os.Getenv("SWIFT_TENANT_ID")
|
||||
domain = os.Getenv("SWIFT_DOMAIN_NAME")
|
||||
domainID = os.Getenv("SWIFT_DOMAIN_ID")
|
||||
tenantDomain = os.Getenv("SWIFT_DOMAIN_NAME")
|
||||
tenantDomainID = os.Getenv("SWIFT_DOMAIN_ID")
|
||||
trustID = os.Getenv("SWIFT_TRUST_ID")
|
||||
container = os.Getenv("SWIFT_CONTAINER_NAME")
|
||||
region = os.Getenv("SWIFT_REGION_NAME")
|
||||
AuthVersion, _ = strconv.Atoi(os.Getenv("SWIFT_AUTH_VERSION"))
|
||||
endpointType = os.Getenv("SWIFT_ENDPOINT_TYPE")
|
||||
insecureSkipVerify, _ = strconv.ParseBool(os.Getenv("SWIFT_INSECURESKIPVERIFY"))
|
||||
secretKey = os.Getenv("SWIFT_SECRET_KEY")
|
||||
accessKey = os.Getenv("SWIFT_ACCESS_KEY")
|
||||
containerKey, _ = strconv.ParseBool(os.Getenv("SWIFT_TEMPURL_CONTAINERKEY"))
|
||||
tempURLMethods = strings.Split(os.Getenv("SWIFT_TEMPURL_METHODS"), ",")
|
||||
|
||||
swiftServer *swifttest.SwiftServer
|
||||
err error
|
||||
)
|
||||
|
||||
if username == "" || password == "" || authURL == "" || container == "" {
|
||||
if swiftServer, err = swifttest.NewSwiftServer("localhost"); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
username = "swifttest"
|
||||
password = "swifttest"
|
||||
authURL = swiftServer.AuthURL
|
||||
container = "test"
|
||||
}
|
||||
|
||||
prefix, err := os.MkdirTemp("", "driver-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer os.Remove(prefix)
|
||||
|
||||
swiftDriverConstructor = func(root string) (*Driver, error) {
|
||||
parameters := Parameters{
|
||||
username,
|
||||
password,
|
||||
authURL,
|
||||
tenant,
|
||||
tenantID,
|
||||
domain,
|
||||
domainID,
|
||||
tenantDomain,
|
||||
tenantDomainID,
|
||||
trustID,
|
||||
region,
|
||||
AuthVersion,
|
||||
container,
|
||||
root,
|
||||
endpointType,
|
||||
insecureSkipVerify,
|
||||
defaultChunkSize,
|
||||
secretKey,
|
||||
accessKey,
|
||||
containerKey,
|
||||
tempURLMethods,
|
||||
}
|
||||
|
||||
return New(parameters)
|
||||
}
|
||||
|
||||
driverConstructor := func() (storagedriver.StorageDriver, error) {
|
||||
return swiftDriverConstructor(prefix)
|
||||
}
|
||||
|
||||
testsuites.RegisterSuite(driverConstructor, testsuites.NeverSkip)
|
||||
}
|
||||
|
||||
func TestEmptyRootList(t *testing.T) {
|
||||
validRoot := t.TempDir()
|
||||
|
||||
rootedDriver, err := swiftDriverConstructor(validRoot)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating rooted driver: %v", err)
|
||||
}
|
||||
|
||||
emptyRootDriver, err := swiftDriverConstructor("")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating empty root driver: %v", err)
|
||||
}
|
||||
|
||||
slashRootDriver, err := swiftDriverConstructor("/")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating slash root driver: %v", err)
|
||||
}
|
||||
|
||||
filename := "/test"
|
||||
contents := []byte("contents")
|
||||
ctx := context.Background()
|
||||
err = rootedDriver.PutContent(ctx, filename, contents)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating content: %v", err)
|
||||
}
|
||||
|
||||
keys, _ := emptyRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
|
||||
keys, _ = slashRootDriver.List(ctx, "/")
|
||||
for _, path := range keys {
|
||||
if !storagedriver.PathRegexp.MatchString(path) {
|
||||
t.Fatalf("unexpected string in path: %q != %q", path, storagedriver.PathRegexp)
|
||||
}
|
||||
}
|
||||
|
||||
// Create an object with a path nested under the existing object
|
||||
err = rootedDriver.PutContent(ctx, filename+"/file1", contents)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating content: %v", err)
|
||||
}
|
||||
|
||||
err = rootedDriver.Delete(ctx, filename)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to delete: %v", err)
|
||||
}
|
||||
|
||||
keys, err = rootedDriver.List(ctx, "/")
|
||||
if err != nil {
|
||||
t.Fatalf("failed to list objects after deletion: %v", err)
|
||||
}
|
||||
|
||||
if len(keys) != 0 {
|
||||
t.Fatal("delete did not remove nested objects")
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilenameChunking(t *testing.T) {
|
||||
// Test valid input and sizes
|
||||
input := []string{"a", "b", "c", "d", "e"}
|
||||
expecteds := [][][]string{
|
||||
{
|
||||
{"a"},
|
||||
{"b"},
|
||||
{"c"},
|
||||
{"d"},
|
||||
{"e"},
|
||||
},
|
||||
{
|
||||
{"a", "b"},
|
||||
{"c", "d"},
|
||||
{"e"},
|
||||
},
|
||||
{
|
||||
{"a", "b", "c"},
|
||||
{"d", "e"},
|
||||
},
|
||||
{
|
||||
{"a", "b", "c", "d"},
|
||||
{"e"},
|
||||
},
|
||||
{
|
||||
{"a", "b", "c", "d", "e"},
|
||||
},
|
||||
{
|
||||
{"a", "b", "c", "d", "e"},
|
||||
},
|
||||
}
|
||||
for i, expected := range expecteds {
|
||||
actual, err := chunkFilenames(input, i+1)
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Fatalf("chunk %v didn't match expected value %v", actual, expected)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error chunking filenames: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Test nil input
|
||||
actual, err := chunkFilenames(nil, 5)
|
||||
if len(actual) != 0 {
|
||||
t.Fatal("chunks were returned when passed nil")
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error chunking filenames: %v", err)
|
||||
}
|
||||
|
||||
// Test 0 and < 0 sizes
|
||||
_, err = chunkFilenames(nil, 0)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for size = 0")
|
||||
}
|
||||
_, err = chunkFilenames(nil, -1)
|
||||
if err == nil {
|
||||
t.Fatal("expected error for size = -1")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSwiftSegmentPath(t *testing.T) {
|
||||
d := &driver{
|
||||
Prefix: "/test/segment/path",
|
||||
}
|
||||
|
||||
s1, err := d.swiftSegmentPath("foo-baz")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error generating segment path: %v", err)
|
||||
}
|
||||
|
||||
s2, err := d.swiftSegmentPath("foo-baz")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error generating segment path: %v", err)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(s1, "test/segment/path/segments/") {
|
||||
t.Fatalf("expected to be prefixed: %s", s1)
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(s1, "test/segment/path/segments/") {
|
||||
t.Fatalf("expected to be prefixed: %s", s2)
|
||||
}
|
||||
|
||||
if len(s1) != 68 {
|
||||
t.Fatalf("unexpected segment path length, %d != %d", len(s1), 68)
|
||||
}
|
||||
|
||||
if len(s2) != 68 {
|
||||
t.Fatalf("unexpected segment path length, %d != %d", len(s2), 68)
|
||||
}
|
||||
|
||||
if s1 == s2 {
|
||||
t.Fatalf("expected segment paths to differ, %s == %s", s1, s2)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user