mirror of
https://github.com/distribution/distribution.git
synced 2025-09-17 23:59:57 +00:00
Replace redigo with redis-go
We are replacing the very outdated redigo Go module with the official redis Go module, go-redis. Signed-off-by: Milos Gajdos <milosthegajdos@gmail.com>
This commit is contained in:
104
registry/storage/cache/redis/redis.go
vendored
104
registry/storage/cache/redis/redis.go
vendored
@@ -3,13 +3,14 @@ package redis
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/distribution/distribution/v3"
|
||||
"github.com/distribution/distribution/v3/reference"
|
||||
"github.com/distribution/distribution/v3/registry/storage/cache"
|
||||
"github.com/distribution/distribution/v3/registry/storage/cache/metrics"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/opencontainers/go-digest"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// redisBlobStatService provides an implementation of
|
||||
@@ -24,7 +25,7 @@ import (
|
||||
// Note that there is no implied relationship between these two caches. The
|
||||
// layer may exist in one, both or none and the code must be written this way.
|
||||
type redisBlobDescriptorService struct {
|
||||
pool *redis.Pool
|
||||
pool *redis.Client
|
||||
|
||||
// TODO(stevvooe): We use a pool because we don't have great control over
|
||||
// the cache lifecycle to manage connections. A new connection if fetched
|
||||
@@ -34,7 +35,7 @@ type redisBlobDescriptorService struct {
|
||||
|
||||
// NewRedisBlobDescriptorCacheProvider returns a new redis-based
|
||||
// BlobDescriptorCacheProvider using the provided redis connection pool.
|
||||
func NewRedisBlobDescriptorCacheProvider(pool *redis.Pool) cache.BlobDescriptorCacheProvider {
|
||||
func NewRedisBlobDescriptorCacheProvider(pool *redis.Client) cache.BlobDescriptorCacheProvider {
|
||||
return metrics.NewPrometheusCacheProvider(
|
||||
&redisBlobDescriptorService{
|
||||
pool: pool,
|
||||
@@ -62,10 +63,7 @@ func (rbds *redisBlobDescriptorService) Stat(ctx context.Context, dgst digest.Di
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
conn := rbds.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return rbds.stat(ctx, conn, dgst)
|
||||
return rbds.stat(ctx, rbds.pool, dgst)
|
||||
}
|
||||
|
||||
func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.Digest) error {
|
||||
@@ -73,26 +71,23 @@ func (rbds *redisBlobDescriptorService) Clear(ctx context.Context, dgst digest.D
|
||||
return err
|
||||
}
|
||||
|
||||
conn := rbds.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
// Not atomic in redis <= 2.3
|
||||
reply, err := conn.Do("HDEL", rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype")
|
||||
cmd := rbds.pool.HDel(ctx, rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype")
|
||||
res, err := cmd.Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if reply == 0 {
|
||||
if res == 0 {
|
||||
return distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// stat provides an internal stat call that takes a connection parameter. This
|
||||
// allows some internal management of the connection scope.
|
||||
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Conn, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
reply, err := redis.Values(conn.Do("HMGET", rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype"))
|
||||
func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn *redis.Client, dgst digest.Digest) (distribution.Descriptor, error) {
|
||||
cmd := conn.HMGet(ctx, rbds.blobDescriptorHashKey(dgst), "digest", "size", "mediatype")
|
||||
reply, err := cmd.Result()
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
@@ -105,10 +100,26 @@ func (rbds *redisBlobDescriptorService) stat(ctx context.Context, conn redis.Con
|
||||
}
|
||||
|
||||
var desc distribution.Descriptor
|
||||
if _, err := redis.Scan(reply, &desc.Digest, &desc.Size, &desc.MediaType); err != nil {
|
||||
digestString, ok := reply[0].(string)
|
||||
if !ok {
|
||||
return distribution.Descriptor{}, fmt.Errorf("digest is not a string")
|
||||
}
|
||||
desc.Digest = digest.Digest(digestString)
|
||||
sizeString, ok := reply[1].(string)
|
||||
if !ok {
|
||||
return distribution.Descriptor{}, fmt.Errorf("size is not a string")
|
||||
}
|
||||
size, err := strconv.ParseInt(sizeString, 10, 64)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
desc.Size = size
|
||||
if reply[2] != nil {
|
||||
mediaType, ok := reply[2].(string)
|
||||
if ok {
|
||||
desc.MediaType = mediaType
|
||||
}
|
||||
}
|
||||
return desc, nil
|
||||
}
|
||||
|
||||
@@ -124,25 +135,23 @@ func (rbds *redisBlobDescriptorService) SetDescriptor(ctx context.Context, dgst
|
||||
return err
|
||||
}
|
||||
|
||||
conn := rbds.pool.Get()
|
||||
defer conn.Close()
|
||||
if err := cache.ValidateDescriptor(desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return rbds.setDescriptor(ctx, conn, dgst, desc)
|
||||
return rbds.setDescriptor(ctx, rbds.pool, dgst, desc)
|
||||
}
|
||||
|
||||
func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if _, err := conn.Do("HMSET", rbds.blobDescriptorHashKey(dgst),
|
||||
"digest", desc.Digest,
|
||||
"size", desc.Size); err != nil {
|
||||
return err
|
||||
func (rbds *redisBlobDescriptorService) setDescriptor(ctx context.Context, conn *redis.Client, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
cmd := rbds.pool.HMSet(ctx, rbds.blobDescriptorHashKey(dgst), "digest", desc.Digest.String(), "size", desc.Size)
|
||||
if cmd.Err() != nil {
|
||||
return cmd.Err()
|
||||
}
|
||||
|
||||
// Only set mediatype if not already set.
|
||||
if _, err := conn.Do("HSETNX", rbds.blobDescriptorHashKey(dgst),
|
||||
"mediatype", desc.MediaType); err != nil {
|
||||
return err
|
||||
cmd = rbds.pool.HSetNX(ctx, rbds.blobDescriptorHashKey(dgst), "mediatype", desc.MediaType)
|
||||
if cmd.Err() != nil {
|
||||
return cmd.Err()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -165,31 +174,27 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Stat(ctx context.Conte
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
conn := rsrbds.upstream.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
pool := rsrbds.upstream.pool
|
||||
// Check membership to repository first
|
||||
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
|
||||
member, err := pool.SIsMember(ctx, rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst.String()).Result()
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
if !member {
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
upstream, err := rsrbds.upstream.stat(ctx, conn, dgst)
|
||||
upstream, err := rsrbds.upstream.stat(ctx, pool, dgst)
|
||||
if err != nil {
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
// We allow a per repository mediatype, let's look it up here.
|
||||
mediatype, err := redis.String(conn.Do("HGET", rsrbds.blobDescriptorHashKey(dgst), "mediatype"))
|
||||
mediatype, err := pool.HGet(ctx, rsrbds.blobDescriptorHashKey(dgst), "mediatype").Result()
|
||||
if err != nil {
|
||||
if err == redis.ErrNil {
|
||||
if err == redis.Nil {
|
||||
return distribution.Descriptor{}, distribution.ErrBlobUnknown
|
||||
}
|
||||
|
||||
return distribution.Descriptor{}, err
|
||||
}
|
||||
|
||||
@@ -206,15 +211,11 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) Clear(ctx context.Cont
|
||||
return err
|
||||
}
|
||||
|
||||
conn := rsrbds.upstream.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
// Check membership to repository first
|
||||
member, err := redis.Bool(conn.Do("SISMEMBER", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst))
|
||||
member, err := rsrbds.upstream.pool.SIsMember(ctx, rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst.String()).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !member {
|
||||
return distribution.ErrBlobUnknown
|
||||
}
|
||||
@@ -237,14 +238,12 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) SetDescriptor(ctx cont
|
||||
}
|
||||
}
|
||||
|
||||
conn := rsrbds.upstream.pool.Get()
|
||||
defer conn.Close()
|
||||
|
||||
return rsrbds.setDescriptor(ctx, conn, dgst, desc)
|
||||
return rsrbds.setDescriptor(ctx, rsrbds.upstream.pool, dgst, desc)
|
||||
}
|
||||
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, conn redis.Conn, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
if _, err := conn.Do("SADD", rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst); err != nil {
|
||||
func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx context.Context, conn *redis.Client, dgst digest.Digest, desc distribution.Descriptor) error {
|
||||
_, err := conn.SAdd(ctx, rsrbds.repositoryBlobSetKey(rsrbds.repo), dgst.String()).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -253,7 +252,8 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) setDescriptor(ctx cont
|
||||
}
|
||||
|
||||
// Override repository mediatype.
|
||||
if _, err := conn.Do("HSET", rsrbds.blobDescriptorHashKey(dgst), "mediatype", desc.MediaType); err != nil {
|
||||
_, err = conn.HSet(ctx, rsrbds.blobDescriptorHashKey(dgst), "mediatype", desc.MediaType).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
29
registry/storage/cache/redis/redis_test.go
vendored
29
registry/storage/cache/redis/redis_test.go
vendored
@@ -1,13 +1,13 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/distribution/distribution/v3/registry/storage/cache/cachecheck"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var redisAddr string
|
||||
@@ -29,25 +29,22 @@ func TestRedisBlobDescriptorCacheProvider(t *testing.T) {
|
||||
t.Skip("please set -test.registry.storage.cache.redis.addr to test layer info cache against redis")
|
||||
}
|
||||
|
||||
pool := &redis.Pool{
|
||||
Dial: func() (redis.Conn, error) {
|
||||
return redis.Dial("tcp", redisAddr)
|
||||
pool := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddr,
|
||||
OnConnect: func(ctx context.Context, cn *redis.Conn) error {
|
||||
res := cn.Ping(ctx)
|
||||
return res.Err()
|
||||
},
|
||||
MaxIdle: 1,
|
||||
MaxActive: 2,
|
||||
TestOnBorrow: func(c redis.Conn, t time.Time) error {
|
||||
_, err := c.Do("PING")
|
||||
return err
|
||||
},
|
||||
Wait: false, // if a connection is not available, proceed without cache.
|
||||
}
|
||||
MaxRetries: 3,
|
||||
PoolSize: 2,
|
||||
})
|
||||
|
||||
// Clear the database
|
||||
conn := pool.Get()
|
||||
if _, err := conn.Do("FLUSHDB"); err != nil {
|
||||
ctx := context.Background()
|
||||
err := pool.FlushDB(ctx).Err()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error flushing redis db: %v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
cachecheck.CheckBlobDescriptorCache(t, NewRedisBlobDescriptorCacheProvider(pool))
|
||||
}
|
||||
|
Reference in New Issue
Block a user