Add regulator to GCS

Signed-off-by: Huu Nguyen <whoshuu@gmail.com>
This commit is contained in:
Tony Holdstock-Brown
2016-12-06 15:50:39 -08:00
committed by Ryan Abrams
parent b424c3d870
commit f9187b2572
4 changed files with 114 additions and 40 deletions

View File

@@ -9,8 +9,6 @@
//
// Note that the contents of incomplete uploads are not accessible even though
// Stat returns their length
//
// +build include_gcs
package gcs
@@ -50,6 +48,8 @@ const (
uploadSessionContentType = "application/x-docker-upload-session"
minChunkSize = 256 * 1024
defaultChunkSize = 20 * minChunkSize
defaultMaxConcurrency = 50
minConcurrency = 25
maxTries = 5
)
@@ -65,6 +65,12 @@ type driverParameters struct {
client *http.Client
rootDirectory string
chunkSize int
// maxConcurrency limits the number of concurrent driver operations
// to GCS, which ultimately increases reliability of many simultaneous
// pushes by ensuring we aren't DoSing our own server with many
// connections.
maxConcurrency uint64
}
func init() {
@@ -90,6 +96,16 @@ type driver struct {
chunkSize int
}
// Wrapper wraps `driver` with a throttler, ensuring that no more than N
// GCS actions can occur concurrently. The default limit is 75.
type Wrapper struct {
baseEmbed
}
type baseEmbed struct {
base.Base
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - bucket
@@ -174,13 +190,19 @@ func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDri
}
}
maxConcurrency, err := base.GetLimitFromParameter(parameters["maxconcurrency"], minConcurrency, defaultMaxConcurrency)
if err != nil {
return nil, fmt.Errorf("maxconcurrency config error: %s", err)
}
params := driverParameters{
bucket: fmt.Sprint(bucket),
rootDirectory: fmt.Sprint(rootDirectory),
email: jwtConf.Email,
privateKey: jwtConf.PrivateKey,
client: oauth2.NewClient(context.Background(), ts),
chunkSize: chunkSize,
bucket: fmt.Sprint(bucket),
rootDirectory: fmt.Sprint(rootDirectory),
email: jwtConf.Email,
privateKey: jwtConf.PrivateKey,
client: oauth2.NewClient(context.Background(), ts),
chunkSize: chunkSize,
maxConcurrency: maxConcurrency,
}
return New(params)
@@ -204,8 +226,12 @@ func New(params driverParameters) (storagedriver.StorageDriver, error) {
chunkSize: params.chunkSize,
}
return &base.Base{
StorageDriver: d,
return &Wrapper{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: base.NewRegulator(d, params.maxConcurrency),
},
},
}, nil
}
@@ -890,7 +916,7 @@ func (d *driver) context(context context.Context) context.Context {
}
func (d *driver) pathToKey(path string) string {
return strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/")
return strings.TrimSpace(strings.TrimRight(d.rootDirectory+strings.TrimLeft(path, "/"), "/"))
}
func (d *driver) pathToDirKey(path string) string {