registry/storage/driver/azure: fix Move method

Something seems broken on azure/azure sdk side - it is currently not
possible to copy a blob of type AppendBlob using `CopyFromURL`.
Using the AppendBlob client via NewAppendBlobClient does not work
either.

According to Azure the correct way to do this is by using
StartCopyFromURL. Because this is an async operation, we need to do
polling ourselves. A simple backoff mechanism is used, where during each
iteration, the configured delay is multiplied by the retry number.

Also introduces two new config options for the Azure driver:
copy_status_poll_max_retry, and copy_status_poll_delay.

Signed-off-by: Flavian Missi <fmissi@redhat.com>
This commit is contained in:
Flavian Missi
2023-06-01 16:19:34 +02:00
parent ba46c769b3
commit 2b72c4d1ca
5 changed files with 149 additions and 29 deletions

View File

@@ -21,16 +21,17 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
const driverName = "azure"
const (
driverName = "azure"
maxChunkSize = 4 * 1024 * 1024
)
type driver struct {
azClient *azureClient
client *container.Client
rootDirectory string
azClient *azureClient
client *container.Client
rootDirectory string
copyStatusPollMaxRetry int
copyStatusPollDelay time.Duration
}
type baseEmbed struct{ base.Base }
@@ -59,11 +60,19 @@ func New(params *Parameters) (*Driver, error) {
if err != nil {
return nil, err
}
copyStatusPollDelay, err := time.ParseDuration(params.CopyStatusPollDelay)
if err != nil {
return nil, err
}
client := azClient.ContainerClient()
d := &driver{
azClient: azClient,
client: client,
rootDirectory: params.RootDirectory,
azClient: azClient,
client: client,
rootDirectory: params.RootDirectory,
copyStatusPollMaxRetry: params.CopyStatusPollMaxRetry,
copyStatusPollDelay: copyStatusPollDelay,
}
return &Driver{baseEmbed: baseEmbed{Base: base.Base{StorageDriver: d}}}, nil
}
@@ -282,7 +291,7 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
return err
}
destBlobRef := d.client.NewBlockBlobClient(d.blobName(destPath))
_, err = destBlobRef.CopyFromURL(ctx, sourceBlobURL, nil)
resp, err := destBlobRef.StartCopyFromURL(ctx, sourceBlobURL, nil)
if err != nil {
if is404(err) {
return storagedriver.PathNotFoundError{Path: sourcePath}
@@ -290,6 +299,39 @@ func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) e
return err
}
copyStatus := *resp.CopyStatus
if d.copyStatusPollMaxRetry == -1 && copyStatus == blob.CopyStatusTypePending {
destBlobRef.AbortCopyFromURL(ctx, *resp.CopyID, nil)
return nil
}
retryCount := 1
for copyStatus == blob.CopyStatusTypePending {
props, err := destBlobRef.GetProperties(ctx, nil)
if err != nil {
return err
}
if retryCount >= d.copyStatusPollMaxRetry {
destBlobRef.AbortCopyFromURL(ctx, *props.CopyID, nil)
return fmt.Errorf("max retries for copy polling reached, aborting copy")
}
copyStatus = *props.CopyStatus
if copyStatus == blob.CopyStatusTypeAborted || copyStatus == blob.CopyStatusTypeFailed {
if props.CopyStatusDescription != nil {
return fmt.Errorf("failed to move blob: %s", *props.CopyStatusDescription)
}
return fmt.Errorf("failed to move blob with copy id %s", *props.CopyID)
}
if copyStatus == blob.CopyStatusTypePending {
time.Sleep(d.copyStatusPollDelay * time.Duration(retryCount))
}
retryCount++
}
_, err = d.client.NewBlobClient(d.blobName(sourcePath)).Delete(ctx, nil)
return err
}

View File

@@ -1,7 +1,9 @@
package azure
import (
"context"
"fmt"
"math/rand"
"os"
"strings"
"testing"
@@ -19,6 +21,8 @@ const (
envRootDirectory = "AZURE_ROOT_DIRECTORY"
)
var azureDriverConstructor func() (storagedriver.StorageDriver, error)
// Hook up gocheck into the "go test" runner.
func Test(t *testing.T) { check.TestingT(t) }
@@ -36,10 +40,10 @@ func init() {
value *string
missingOk bool
}{
{envAccountName, &accountName, false},
{envAccountKey, &accountKey, false},
{envContainer, &container, false},
{envRealm, &realm, false},
{envAccountName, &accountName, true},
{envAccountKey, &accountKey, true},
{envContainer, &container, true},
{envRealm, &realm, true},
{envRootDirectory, &rootDirectory, true},
}
@@ -51,7 +55,7 @@ func init() {
}
}
azureDriverConstructor := func() (storagedriver.StorageDriver, error) {
azureDriverConstructor = func() (storagedriver.StorageDriver, error) {
parameters := map[string]interface{}{
"container": container,
"accountname": accountName,
@@ -77,6 +81,66 @@ func init() {
testsuites.RegisterSuite(azureDriverConstructor, skipCheck)
}
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func randStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
func TestCommitAfterMove(t *testing.T) {
driver, err := azureDriverConstructor()
if err != nil {
t.Fatalf("unexpected error creating azure driver: %v", err)
}
contents := randStringRunes(4 * 1024 * 1024)
sourcePath := "/source/file"
destPath := "/dest/file"
ctx := context.Background()
defer driver.Delete(ctx, sourcePath)
defer driver.Delete(ctx, destPath)
writer, err := driver.Writer(ctx, sourcePath, false)
if err != nil {
t.Fatalf("unexpected error from driver.Writer: %v", err)
}
_, err = writer.Write([]byte(contents))
if err != nil {
t.Fatalf("writer.Write: unexpected error: %v", err)
}
err = writer.Commit()
if err != nil {
t.Fatalf("writer.Commit: unexpected error: %v", err)
}
err = writer.Close()
if err != nil {
t.Fatalf("writer.Close: unexpected error: %v", err)
}
_, err = driver.GetContent(ctx, sourcePath)
if err != nil {
t.Fatalf("driver.GetContent(sourcePath): unexpected error: %v", err)
}
err = driver.Move(ctx, sourcePath, destPath)
if err != nil {
t.Fatalf("driver.Move: unexpected error: %v", err)
}
_, err = driver.GetContent(ctx, destPath)
if err != nil {
t.Fatalf("GetContent(destPath): unexpected error: %v", err)
}
}
func TestParamParsing(t *testing.T) {
expectErrors := []map[string]interface{}{
{},

View File

@@ -8,7 +8,9 @@ import (
)
const (
defaultRealm = "core.windows.net"
defaultRealm = "core.windows.net"
defaultCopyStatusPollMaxRetry = 5
defaultCopyStatusPollDelay = "100ms"
)
type Credentials struct {
@@ -19,14 +21,16 @@ type Credentials struct {
}
type Parameters struct {
Container string `mapstructure:"container"`
AccountName string `mapstructure:"accountname"`
AccountKey string `mapstructure:"accountkey"`
Credentials Credentials `mapstructure:"credentials"`
ConnectionString string `mapstructure:"connectionstring"`
Realm string `mapstructure:"realm"`
RootDirectory string `mapstructure:"rootdirectory"`
ServiceURL string `mapstructure:"serviceurl"`
Container string `mapstructure:"container"`
AccountName string `mapstructure:"accountname"`
AccountKey string `mapstructure:"accountkey"`
Credentials Credentials `mapstructure:"credentials"`
ConnectionString string `mapstructure:"connectionstring"`
Realm string `mapstructure:"realm"`
RootDirectory string `mapstructure:"rootdirectory"`
ServiceURL string `mapstructure:"serviceurl"`
CopyStatusPollMaxRetry int `mapstructure:"copy_status_poll_max_retry"`
CopyStatusPollDelay string `mapstructure:"copy_status_poll_delay"`
}
func NewParameters(parameters map[string]interface{}) (*Parameters, error) {
@@ -45,5 +49,11 @@ func NewParameters(parameters map[string]interface{}) (*Parameters, error) {
if params.ServiceURL == "" {
params.ServiceURL = fmt.Sprintf("https://%s.blob.%s", params.AccountName, params.Realm)
}
if params.CopyStatusPollMaxRetry == 0 {
params.CopyStatusPollMaxRetry = defaultCopyStatusPollMaxRetry
}
if params.CopyStatusPollDelay == "" {
params.CopyStatusPollDelay = defaultCopyStatusPollDelay
}
return &params, nil
}