Move registry client internal

Our registry client is not currently in a good place to be used as the
reference OCI Distribution client implementation. But the registry proxy
currently depends on it. Make the registry client internal to the
distribution application to remove it from the API surface area (and any
implied compatibility promises) of distribution/v3@v3.0.0 without
breaking the proxy.

Signed-off-by: Cory Snider <csnider@mirantis.com>
This commit is contained in:
Cory Snider
2023-10-24 12:26:38 -04:00
parent 708bc6f3e9
commit cc23fdacff
18 changed files with 15 additions and 15 deletions

View File

@@ -0,0 +1,58 @@
package auth
import (
"net/http"
"strings"
)
// APIVersion represents a version of an API including its
// type and version number.
type APIVersion struct {
// Type refers to the name of a specific API specification
// such as "registry"
Type string
// Version is the version of the API specification implemented,
// This may omit the revision number and only include
// the major and minor version, such as "2.0"
Version string
}
// String returns the string formatted API Version
func (v APIVersion) String() string {
return v.Type + "/" + v.Version
}
// APIVersions gets the API versions out of an HTTP response using the provided
// version header as the key for the HTTP header.
func APIVersions(resp *http.Response, versionHeader string) []APIVersion {
versions := []APIVersion{}
if versionHeader != "" {
for _, supportedVersions := range resp.Header[http.CanonicalHeaderKey(versionHeader)] {
for _, version := range strings.Fields(supportedVersions) {
versions = append(versions, ParseAPIVersion(version))
}
}
}
return versions
}
// ParseAPIVersion parses an API version string into an APIVersion
// Format (Expected, not enforced):
// API version string = <API type> '/' <API version>
// API type = [a-z][a-z0-9]*
// API version = [0-9]+(\.[0-9]+)?
// TODO(dmcgowan): Enforce format, add error condition, remove unknown type
func ParseAPIVersion(versionStr string) APIVersion {
idx := strings.IndexRune(versionStr, '/')
if idx == -1 {
return APIVersion{
Type: "unknown",
Version: versionStr,
}
}
return APIVersion{
Type: strings.ToLower(versionStr[:idx]),
Version: versionStr[idx+1:],
}
}

View File

@@ -0,0 +1,27 @@
package challenge
import (
"net/url"
"strings"
)
// FROM: https://golang.org/src/net/http/http.go
// Given a string of the form "host", "host:port", or "[ipv6::address]:port",
// return true if the string includes a port.
func hasPort(s string) bool { return strings.LastIndex(s, ":") > strings.LastIndex(s, "]") }
// FROM: http://golang.org/src/net/http/transport.go
var portMap = map[string]string{
"http": "80",
"https": "443",
}
// canonicalAddr returns url.Host but always with a ":port" suffix
// FROM: http://golang.org/src/net/http/transport.go
func canonicalAddr(url *url.URL) string {
addr := url.Host
if !hasPort(addr) {
return addr + ":" + portMap[url.Scheme]
}
return addr
}

View File

@@ -0,0 +1,237 @@
package challenge
import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"
)
// Challenge carries information from a WWW-Authenticate response header.
// See RFC 2617.
type Challenge struct {
// Scheme is the auth-scheme according to RFC 2617
Scheme string
// Parameters are the auth-params according to RFC 2617
Parameters map[string]string
}
// Manager manages the challenges for endpoints.
// The challenges are pulled out of HTTP responses. Only
// responses which expect challenges should be added to
// the manager, since a non-unauthorized request will be
// viewed as not requiring challenges.
type Manager interface {
// GetChallenges returns the challenges for the given
// endpoint URL.
GetChallenges(endpoint url.URL) ([]Challenge, error)
// AddResponse adds the response to the challenge
// manager. The challenges will be parsed out of
// the WWW-Authenicate headers and added to the
// URL which was produced the response. If the
// response was authorized, any challenges for the
// endpoint will be cleared.
AddResponse(resp *http.Response) error
}
// NewSimpleManager returns an instance of
// Manager which only maps endpoints to challenges
// based on the responses which have been added the
// manager. The simple manager will make no attempt to
// perform requests on the endpoints or cache the responses
// to a backend.
func NewSimpleManager() Manager {
return &simpleManager{
Challenges: make(map[string][]Challenge),
}
}
type simpleManager struct {
sync.RWMutex
Challenges map[string][]Challenge
}
func normalizeURL(endpoint *url.URL) {
endpoint.Host = strings.ToLower(endpoint.Host)
endpoint.Host = canonicalAddr(endpoint)
}
func (m *simpleManager) GetChallenges(endpoint url.URL) ([]Challenge, error) {
normalizeURL(&endpoint)
m.RLock()
defer m.RUnlock()
challenges := m.Challenges[endpoint.String()]
return challenges, nil
}
func (m *simpleManager) AddResponse(resp *http.Response) error {
challenges := ResponseChallenges(resp)
if resp.Request == nil {
return fmt.Errorf("missing request reference")
}
urlCopy := url.URL{
Path: resp.Request.URL.Path,
Host: resp.Request.URL.Host,
Scheme: resp.Request.URL.Scheme,
}
normalizeURL(&urlCopy)
m.Lock()
defer m.Unlock()
m.Challenges[urlCopy.String()] = challenges
return nil
}
// Octet types from RFC 2616.
type octetType byte
var octetTypes [256]octetType
const (
isToken octetType = 1 << iota
isSpace
)
func init() {
// OCTET = <any 8-bit sequence of data>
// CHAR = <any US-ASCII character (octets 0 - 127)>
// CTL = <any US-ASCII control character (octets 0 - 31) and DEL (127)>
// CR = <US-ASCII CR, carriage return (13)>
// LF = <US-ASCII LF, linefeed (10)>
// SP = <US-ASCII SP, space (32)>
// HT = <US-ASCII HT, horizontal-tab (9)>
// <"> = <US-ASCII double-quote mark (34)>
// CRLF = CR LF
// LWS = [CRLF] 1*( SP | HT )
// TEXT = <any OCTET except CTLs, but including LWS>
// separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <">
// | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT
// token = 1*<any CHAR except CTLs or separators>
// qdtext = <any TEXT except <">>
for c := 0; c < 256; c++ {
var t octetType
isCtl := c <= 31 || c == 127
isChar := 0 <= c && c <= 127
isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c))
if strings.ContainsRune(" \t\r\n", rune(c)) {
t |= isSpace
}
if isChar && !isCtl && !isSeparator {
t |= isToken
}
octetTypes[c] = t
}
}
// ResponseChallenges returns a list of authorization challenges
// for the given http Response. Challenges are only checked if
// the response status code was a 401.
func ResponseChallenges(resp *http.Response) []Challenge {
if resp.StatusCode == http.StatusUnauthorized {
// Parse the WWW-Authenticate Header and store the challenges
// on this endpoint object.
return parseAuthHeader(resp.Header)
}
return nil
}
func parseAuthHeader(header http.Header) []Challenge {
challenges := []Challenge{}
for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] {
v, p := parseValueAndParams(h)
if v != "" {
challenges = append(challenges, Challenge{Scheme: v, Parameters: p})
}
}
return challenges
}
func parseValueAndParams(header string) (value string, params map[string]string) {
params = make(map[string]string)
value, s := expectToken(header)
if value == "" {
return
}
value = strings.ToLower(value)
s = "," + skipSpace(s)
for strings.HasPrefix(s, ",") {
var pkey string
pkey, s = expectToken(skipSpace(s[1:]))
if pkey == "" {
return
}
if !strings.HasPrefix(s, "=") {
return
}
var pvalue string
pvalue, s = expectTokenOrQuoted(s[1:])
if pvalue == "" {
return
}
pkey = strings.ToLower(pkey)
params[pkey] = pvalue
s = skipSpace(s)
}
return
}
func skipSpace(s string) (rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isSpace == 0 {
break
}
}
return s[i:]
}
func expectToken(s string) (token, rest string) {
i := 0
for ; i < len(s); i++ {
if octetTypes[s[i]]&isToken == 0 {
break
}
}
return s[:i], s[i:]
}
func expectTokenOrQuoted(s string) (value string, rest string) {
if !strings.HasPrefix(s, "\"") {
return expectToken(s)
}
s = s[1:]
for i := 0; i < len(s); i++ {
switch s[i] {
case '"':
return s[:i], s[i+1:]
case '\\':
p := make([]byte, len(s)-1)
j := copy(p, s[:i])
escape := true
for i = i + 1; i < len(s); i++ {
b := s[i]
switch {
case escape:
escape = false
p[j] = b
j++
case b == '\\':
escape = true
case b == '"':
return string(p[:j]), s[i+1:]
default:
p[j] = b
j++
}
}
return "", ""
}
}
return "", ""
}

View File

@@ -0,0 +1,124 @@
package challenge
import (
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"testing"
)
func TestAuthChallengeParse(t *testing.T) {
header := http.Header{}
header.Add("WWW-Authenticate", `Bearer realm="https://auth.example.com/token",service="registry.example.com",other=fun,slashed="he\"\l\lo"`)
challenges := parseAuthHeader(header)
if len(challenges) != 1 {
t.Fatalf("Unexpected number of auth challenges: %d, expected 1", len(challenges))
}
challenge := challenges[0]
if expected := "bearer"; challenge.Scheme != expected {
t.Fatalf("Unexpected scheme: %s, expected: %s", challenge.Scheme, expected)
}
if expected := "https://auth.example.com/token"; challenge.Parameters["realm"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["realm"], expected)
}
if expected := "registry.example.com"; challenge.Parameters["service"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["service"], expected)
}
if expected := "fun"; challenge.Parameters["other"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["other"], expected)
}
if expected := "he\"llo"; challenge.Parameters["slashed"] != expected {
t.Fatalf("Unexpected param: %s, expected: %s", challenge.Parameters["slashed"], expected)
}
}
func TestAuthChallengeNormalization(t *testing.T) {
testAuthChallengeNormalization(t, "reg.EXAMPLE.com")
testAuthChallengeNormalization(t, "bɿɒʜɔiɿ-ɿɘƚƨim-ƚol-ɒ-ƨʞnɒʜƚ.com")
testAuthChallengeNormalization(t, "reg.example.com:80")
testAuthChallengeConcurrent(t, "reg.EXAMPLE.com")
}
func testAuthChallengeNormalization(t *testing.T, host string) {
scm := NewSimpleManager()
url, err := url.Parse(fmt.Sprintf("http://%s/v2/", host))
if err != nil {
t.Fatal(err)
}
resp := &http.Response{
Request: &http.Request{
URL: url,
},
Header: make(http.Header),
StatusCode: http.StatusUnauthorized,
}
resp.Header.Add("WWW-Authenticate", fmt.Sprintf("Bearer realm=\"https://%s/token\",service=\"registry.example.com\"", host))
err = scm.AddResponse(resp)
if err != nil {
t.Fatal(err)
}
lowered := *url
lowered.Host = strings.ToLower(lowered.Host)
lowered.Host = canonicalAddr(&lowered)
c, err := scm.GetChallenges(lowered)
if err != nil {
t.Fatal(err)
}
if len(c) == 0 {
t.Fatal("Expected challenge for lower-cased-host URL")
}
}
func testAuthChallengeConcurrent(t *testing.T, host string) {
scm := NewSimpleManager()
url, err := url.Parse(fmt.Sprintf("http://%s/v2/", host))
if err != nil {
t.Fatal(err)
}
resp := &http.Response{
Request: &http.Request{
URL: url,
},
Header: make(http.Header),
StatusCode: http.StatusUnauthorized,
}
resp.Header.Add("WWW-Authenticate", fmt.Sprintf("Bearer realm=\"https://%s/token\",service=\"registry.example.com\"", host))
var s sync.WaitGroup
s.Add(2)
go func() {
defer s.Done()
for i := 0; i < 200; i++ {
err = scm.AddResponse(resp)
if err != nil {
t.Error(err)
}
}
}()
go func() {
defer s.Done()
lowered := *url
lowered.Host = strings.ToLower(lowered.Host)
for k := 0; k < 200; k++ {
_, err := scm.GetChallenges(lowered)
if err != nil {
t.Error(err)
}
}
}()
s.Wait()
}

View File

@@ -0,0 +1,537 @@
package auth
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
"sync"
"time"
"github.com/distribution/distribution/v3/internal/client"
"github.com/distribution/distribution/v3/internal/client/auth/challenge"
"github.com/distribution/distribution/v3/internal/client/transport"
)
var (
// ErrNoBasicAuthCredentials is returned if a request can't be authorized with
// basic auth due to lack of credentials.
ErrNoBasicAuthCredentials = errors.New("no basic auth credentials")
// ErrNoToken is returned if a request is successful but the body does not
// contain an authorization token.
ErrNoToken = errors.New("authorization server did not include a token in the response")
)
const defaultClientID = "registry-client"
// AuthenticationHandler is an interface for authorizing a request from
// params from a "WWW-Authenicate" header for a single scheme.
type AuthenticationHandler interface {
// Scheme returns the scheme as expected from the "WWW-Authenicate" header.
Scheme() string
// AuthorizeRequest adds the authorization header to a request (if needed)
// using the parameters from "WWW-Authenticate" method. The parameters
// values depend on the scheme.
AuthorizeRequest(req *http.Request, params map[string]string) error
}
// CredentialStore is an interface for getting credentials for
// a given URL
type CredentialStore interface {
// Basic returns basic auth for the given URL
Basic(*url.URL) (string, string)
// RefreshToken returns a refresh token for the
// given URL and service
RefreshToken(*url.URL, string) string
// SetRefreshToken sets the refresh token if none
// is provided for the given url and service
SetRefreshToken(realm *url.URL, service, token string)
}
// NewAuthorizer creates an authorizer which can handle multiple authentication
// schemes. The handlers are tried in order, the higher priority authentication
// methods should be first. The challengeMap holds a list of challenges for
// a given root API endpoint (for example "https://registry-1.docker.io/v2/").
func NewAuthorizer(manager challenge.Manager, handlers ...AuthenticationHandler) transport.RequestModifier {
return &endpointAuthorizer{
challenges: manager,
handlers: handlers,
}
}
type endpointAuthorizer struct {
challenges challenge.Manager
handlers []AuthenticationHandler
}
func (ea *endpointAuthorizer) ModifyRequest(req *http.Request) error {
pingPath := req.URL.Path
if v2Root := strings.Index(req.URL.Path, "/v2/"); v2Root != -1 {
pingPath = pingPath[:v2Root+4]
} else if v1Root := strings.Index(req.URL.Path, "/v1/"); v1Root != -1 {
pingPath = pingPath[:v1Root] + "/v2/"
} else {
return nil
}
ping := url.URL{
Host: req.URL.Host,
Scheme: req.URL.Scheme,
Path: pingPath,
}
challenges, err := ea.challenges.GetChallenges(ping)
if err != nil {
return err
}
if len(challenges) > 0 {
for _, handler := range ea.handlers {
for _, c := range challenges {
if c.Scheme != handler.Scheme() {
continue
}
if err := handler.AuthorizeRequest(req, c.Parameters); err != nil {
return err
}
}
}
}
return nil
}
// This is the minimum duration a token can last (in seconds).
// A token must not live less than 60 seconds because older versions
// of the Docker client didn't read their expiration from the token
// response and assumed 60 seconds. So to remain compatible with
// those implementations, a token must live at least this long.
const minimumTokenLifetimeSeconds = 60
// Private interface for time used by this package to enable tests to provide their own implementation.
type clock interface {
Now() time.Time
}
type tokenHandler struct {
creds CredentialStore
transport http.RoundTripper
clock clock
offlineAccess bool
forceOAuth bool
clientID string
scopes []Scope
tokenLock sync.Mutex
tokenCache string
tokenExpiration time.Time
logger Logger
}
// Scope is a type which is serializable to a string
// using the allow scope grammar.
type Scope interface {
String() string
}
// RepositoryScope represents a token scope for access
// to a repository.
type RepositoryScope struct {
Repository string
Class string
Actions []string
}
// String returns the string representation of the repository
// using the scope grammar
func (rs RepositoryScope) String() string {
repoType := "repository"
// Keep existing format for image class to maintain backwards compatibility
// with authorization servers which do not support the expanded grammar.
if rs.Class != "" && rs.Class != "image" {
repoType = fmt.Sprintf("%s(%s)", repoType, rs.Class)
}
return fmt.Sprintf("%s:%s:%s", repoType, rs.Repository, strings.Join(rs.Actions, ","))
}
// RegistryScope represents a token scope for access
// to resources in the registry.
type RegistryScope struct {
Name string
Actions []string
}
// String returns the string representation of the user
// using the scope grammar
func (rs RegistryScope) String() string {
return fmt.Sprintf("registry:%s:%s", rs.Name, strings.Join(rs.Actions, ","))
}
// Logger defines the injectable logging interface, used on TokenHandlers.
type Logger interface {
Debugf(format string, args ...interface{})
}
func logDebugf(logger Logger, format string, args ...interface{}) {
if logger == nil {
return
}
logger.Debugf(format, args...)
}
// TokenHandlerOptions is used to configure a new token handler
type TokenHandlerOptions struct {
Transport http.RoundTripper
Credentials CredentialStore
OfflineAccess bool
ForceOAuth bool
ClientID string
Scopes []Scope
Logger Logger
}
// An implementation of clock for providing real time data.
type realClock struct{}
// Now implements clock
func (realClock) Now() time.Time { return time.Now() }
// NewTokenHandler creates a new AuthenicationHandler which supports
// fetching tokens from a remote token server.
func NewTokenHandler(transport http.RoundTripper, creds CredentialStore, scope string, actions ...string) AuthenticationHandler {
// Create options...
return NewTokenHandlerWithOptions(TokenHandlerOptions{
Transport: transport,
Credentials: creds,
Scopes: []Scope{
RepositoryScope{
Repository: scope,
Actions: actions,
},
},
})
}
// NewTokenHandlerWithOptions creates a new token handler using the provided
// options structure.
func NewTokenHandlerWithOptions(options TokenHandlerOptions) AuthenticationHandler {
handler := &tokenHandler{
transport: options.Transport,
creds: options.Credentials,
offlineAccess: options.OfflineAccess,
forceOAuth: options.ForceOAuth,
clientID: options.ClientID,
scopes: options.Scopes,
clock: realClock{},
logger: options.Logger,
}
return handler
}
func (th *tokenHandler) client() *http.Client {
return &http.Client{
Transport: th.transport,
Timeout: 15 * time.Second,
}
}
func (th *tokenHandler) Scheme() string {
return "bearer"
}
func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
var additionalScopes []string
if fromParam := req.URL.Query().Get("from"); fromParam != "" {
additionalScopes = append(additionalScopes, RepositoryScope{
Repository: fromParam,
Actions: []string{"pull"},
}.String())
}
token, err := th.getToken(req.Context(), params, additionalScopes...)
if err != nil {
return err
}
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
return nil
}
func (th *tokenHandler) getToken(ctx context.Context, params map[string]string, additionalScopes ...string) (string, error) {
th.tokenLock.Lock()
defer th.tokenLock.Unlock()
scopes := make([]string, 0, len(th.scopes)+len(additionalScopes))
for _, scope := range th.scopes {
scopes = append(scopes, scope.String())
}
var addedScopes bool
for _, scope := range additionalScopes {
if hasScope(scopes, scope) {
continue
}
scopes = append(scopes, scope)
addedScopes = true
}
now := th.clock.Now()
if now.After(th.tokenExpiration) || addedScopes {
token, expiration, err := th.fetchToken(ctx, params, scopes)
if err != nil {
return "", err
}
// do not update cache for added scope tokens
if !addedScopes {
th.tokenCache = token
th.tokenExpiration = expiration
}
return token, nil
}
return th.tokenCache, nil
}
func hasScope(scopes []string, scope string) bool {
for _, s := range scopes {
if s == scope {
return true
}
}
return false
}
type postTokenResponse struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
Scope string `json:"scope"`
}
func (th *tokenHandler) fetchTokenWithOAuth(ctx context.Context, realm *url.URL, refreshToken, service string, scopes []string) (token string, expiration time.Time, err error) {
form := url.Values{}
form.Set("scope", strings.Join(scopes, " "))
form.Set("service", service)
clientID := th.clientID
if clientID == "" {
// Use default client, this is a required field
clientID = defaultClientID
}
form.Set("client_id", clientID)
if refreshToken != "" {
form.Set("grant_type", "refresh_token")
form.Set("refresh_token", refreshToken)
} else if th.creds != nil {
form.Set("grant_type", "password")
username, password := th.creds.Basic(realm)
form.Set("username", username)
form.Set("password", password)
// attempt to get a refresh token
form.Set("access_type", "offline")
} else {
// refuse to do oauth without a grant type
return "", time.Time{}, fmt.Errorf("no supported grant type")
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, realm.String(), strings.NewReader(form.Encode()))
if err != nil {
return "", time.Time{}, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := th.client().Do(req)
if err != nil {
return "", time.Time{}, err
}
defer resp.Body.Close()
if err := client.HandleHTTPResponseError(resp); err != nil {
return "", time.Time{}, err
}
decoder := json.NewDecoder(resp.Body)
var tr postTokenResponse
if err = decoder.Decode(&tr); err != nil {
return "", time.Time{}, fmt.Errorf("unable to decode token response: %s", err)
}
if tr.AccessToken == "" {
return "", time.Time{}, ErrNoToken
}
if tr.RefreshToken != "" && tr.RefreshToken != refreshToken {
th.creds.SetRefreshToken(realm, service, tr.RefreshToken)
}
if tr.ExpiresIn < minimumTokenLifetimeSeconds {
// The default/minimum lifetime.
tr.ExpiresIn = minimumTokenLifetimeSeconds
logDebugf(th.logger, "Increasing token expiration to: %d seconds", tr.ExpiresIn)
}
if tr.IssuedAt.IsZero() {
// issued_at is optional in the token response.
tr.IssuedAt = th.clock.Now().UTC()
}
return tr.AccessToken, tr.IssuedAt.Add(time.Duration(tr.ExpiresIn) * time.Second), nil
}
type getTokenResponse struct {
Token string `json:"token"`
AccessToken string `json:"access_token"`
ExpiresIn int `json:"expires_in"`
IssuedAt time.Time `json:"issued_at"`
RefreshToken string `json:"refresh_token"`
}
func (th *tokenHandler) fetchTokenWithBasicAuth(ctx context.Context, realm *url.URL, service string, scopes []string) (token string, expiration time.Time, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, realm.String(), nil)
if err != nil {
return "", time.Time{}, err
}
reqParams := req.URL.Query()
if service != "" {
reqParams.Add("service", service)
}
for _, scope := range scopes {
reqParams.Add("scope", scope)
}
if th.offlineAccess {
reqParams.Add("offline_token", "true")
clientID := th.clientID
if clientID == "" {
clientID = defaultClientID
}
reqParams.Add("client_id", clientID)
}
if th.creds != nil {
username, password := th.creds.Basic(realm)
if username != "" && password != "" {
reqParams.Add("account", username)
req.SetBasicAuth(username, password)
}
}
req.URL.RawQuery = reqParams.Encode()
resp, err := th.client().Do(req)
if err != nil {
return "", time.Time{}, err
}
defer resp.Body.Close()
if err := client.HandleHTTPResponseError(resp); err != nil {
return "", time.Time{}, err
}
decoder := json.NewDecoder(resp.Body)
var tr getTokenResponse
if err = decoder.Decode(&tr); err != nil {
return "", time.Time{}, fmt.Errorf("unable to decode token response: %s", err)
}
if tr.RefreshToken != "" && th.creds != nil {
th.creds.SetRefreshToken(realm, service, tr.RefreshToken)
}
// `access_token` is equivalent to `token` and if both are specified
// the choice is undefined. Canonicalize `access_token` by sticking
// things in `token`.
if tr.AccessToken != "" {
tr.Token = tr.AccessToken
}
if tr.Token == "" {
return "", time.Time{}, ErrNoToken
}
if tr.ExpiresIn < minimumTokenLifetimeSeconds {
// The default/minimum lifetime.
tr.ExpiresIn = minimumTokenLifetimeSeconds
logDebugf(th.logger, "Increasing token expiration to: %d seconds", tr.ExpiresIn)
}
if tr.IssuedAt.IsZero() {
// issued_at is optional in the token response.
tr.IssuedAt = th.clock.Now().UTC()
}
return tr.Token, tr.IssuedAt.Add(time.Duration(tr.ExpiresIn) * time.Second), nil
}
func (th *tokenHandler) fetchToken(ctx context.Context, params map[string]string, scopes []string) (token string, expiration time.Time, err error) {
realm, ok := params["realm"]
if !ok {
return "", time.Time{}, errors.New("no realm specified for token auth challenge")
}
// TODO(dmcgowan): Handle empty scheme and relative realm
realmURL, err := url.Parse(realm)
if err != nil {
return "", time.Time{}, fmt.Errorf("invalid token auth challenge realm: %s", err)
}
service := params["service"]
var refreshToken string
if th.creds != nil {
refreshToken = th.creds.RefreshToken(realmURL, service)
}
if refreshToken != "" || th.forceOAuth {
return th.fetchTokenWithOAuth(ctx, realmURL, refreshToken, service, scopes)
}
return th.fetchTokenWithBasicAuth(ctx, realmURL, service, scopes)
}
type basicHandler struct {
creds CredentialStore
}
// NewBasicHandler creaters a new authentiation handler which adds
// basic authentication credentials to a request.
func NewBasicHandler(creds CredentialStore) AuthenticationHandler {
return &basicHandler{
creds: creds,
}
}
func (*basicHandler) Scheme() string {
return "basic"
}
func (bh *basicHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
if bh.creds != nil {
username, password := bh.creds.Basic(req.URL)
if username != "" && password != "" {
req.SetBasicAuth(username, password)
return nil
}
}
return ErrNoBasicAuthCredentials
}

View File

@@ -0,0 +1,877 @@
package auth
import (
"encoding/base64"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/distribution/distribution/v3/internal/client/auth/challenge"
"github.com/distribution/distribution/v3/internal/client/transport"
"github.com/distribution/distribution/v3/testutil"
)
// An implementation of clock for providing fake time data.
type fakeClock struct {
current time.Time
}
// Now implements clock
func (fc *fakeClock) Now() time.Time { return fc.current }
func testServer(rrm testutil.RequestResponseMap) (string, func()) {
h := testutil.NewHandler(rrm)
s := httptest.NewServer(h)
return s.URL, s.Close
}
type testAuthenticationWrapper struct {
headers http.Header
authCheck func(string) bool
next http.Handler
}
func (w *testAuthenticationWrapper) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
auth := r.Header.Get("Authorization")
if auth == "" || !w.authCheck(auth) {
h := rw.Header()
for k, values := range w.headers {
h[k] = values
}
rw.WriteHeader(http.StatusUnauthorized)
return
}
w.next.ServeHTTP(rw, r)
}
func testServerWithAuth(rrm testutil.RequestResponseMap, authenticate string, authCheck func(string) bool) (string, func()) {
h := testutil.NewHandler(rrm)
wrapper := &testAuthenticationWrapper{
headers: http.Header(map[string][]string{
"X-API-Version": {"registry/2.0"},
"X-Multi-API-Version": {"registry/2.0", "registry/2.1", "trust/1.0"},
"WWW-Authenticate": {authenticate},
}),
authCheck: authCheck,
next: h,
}
s := httptest.NewServer(wrapper)
return s.URL, s.Close
}
// ping pings the provided endpoint to determine its required authorization challenges.
// If a version header is provided, the versions will be returned.
func ping(manager challenge.Manager, endpoint, versionHeader string) ([]APIVersion, error) {
resp, err := http.Get(endpoint)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if err := manager.AddResponse(resp); err != nil {
return nil, err
}
return APIVersions(resp, versionHeader), err
}
type testCredentialStore struct {
username string
password string
refreshTokens map[string]string
}
func (tcs *testCredentialStore) Basic(*url.URL) (string, string) {
return tcs.username, tcs.password
}
func (tcs *testCredentialStore) RefreshToken(u *url.URL, service string) string {
return tcs.refreshTokens[service]
}
func (tcs *testCredentialStore) SetRefreshToken(u *url.URL, service string, token string) {
if tcs.refreshTokens != nil {
tcs.refreshTokens[service] = token
}
}
func TestEndpointAuthorizeToken(t *testing.T) {
service := "localhost.localdomain"
repo1 := "some/registry"
repo2 := "other/registry"
scope1 := fmt.Sprintf("repository:%s:pull,push", repo1)
scope2 := fmt.Sprintf("repository:%s:pull,push", repo2)
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: fmt.Sprintf("/token?scope=%s&service=%s", url.QueryEscape(scope1), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"token":"statictoken"}`),
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: fmt.Sprintf("/token?scope=%s&service=%s", url.QueryEscape(scope2), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"token":"badtoken"}`),
},
},
})
te, tc := testServer(tokenMap)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
validCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate, validCheck)
defer c()
challengeManager1 := challenge.NewSimpleManager()
versions, err := ping(challengeManager1, e+"/v2/", "x-api-version")
if err != nil {
t.Fatal(err)
}
if len(versions) != 1 {
t.Fatalf("Unexpected version count: %d, expected 1", len(versions))
}
if check := (APIVersion{Type: "registry", Version: "2.0"}); versions[0] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[0], check)
}
transport1 := transport.NewTransport(nil, NewAuthorizer(challengeManager1, NewTokenHandler(nil, nil, repo1, "pull", "push")))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
e2, c2 := testServerWithAuth(m, authenicate, validCheck)
defer c2()
challengeManager2 := challenge.NewSimpleManager()
versions, err = ping(challengeManager2, e2+"/v2/", "x-multi-api-version")
if err != nil {
t.Fatal(err)
}
if len(versions) != 3 {
t.Fatalf("Unexpected version count: %d, expected 3", len(versions))
}
if check := (APIVersion{Type: "registry", Version: "2.0"}); versions[0] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[0], check)
}
if check := (APIVersion{Type: "registry", Version: "2.1"}); versions[1] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[1], check)
}
if check := (APIVersion{Type: "trust", Version: "1.0"}); versions[2] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[2], check)
}
transport2 := transport.NewTransport(nil, NewAuthorizer(challengeManager2, NewTokenHandler(nil, nil, repo2, "pull", "push")))
client2 := &http.Client{Transport: transport2}
req, _ = http.NewRequest(http.MethodGet, e2+"/v2/hello", nil)
resp, err = client2.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusUnauthorized)
}
}
func TestEndpointAuthorizeRefreshToken(t *testing.T) {
service := "localhost.localdomain"
repo1 := "some/registry"
repo2 := "other/registry"
scope1 := fmt.Sprintf("repository:%s:pull,push", repo1)
scope2 := fmt.Sprintf("repository:%s:pull,push", repo2)
refreshToken1 := "0123456790abcdef"
refreshToken2 := "0123456790fedcba"
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodPost,
Route: "/token",
Body: []byte(fmt.Sprintf("client_id=registry-client&grant_type=refresh_token&refresh_token=%s&scope=%s&service=%s", refreshToken1, url.QueryEscape(scope1), service)),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(fmt.Sprintf(`{"access_token":"statictoken","refresh_token":"%s"}`, refreshToken1)),
},
},
{
// In the future this test may fail and require using basic auth to get a different refresh token
Request: testutil.Request{
Method: http.MethodPost,
Route: "/token",
Body: []byte(fmt.Sprintf("client_id=registry-client&grant_type=refresh_token&refresh_token=%s&scope=%s&service=%s", refreshToken1, url.QueryEscape(scope2), service)),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(fmt.Sprintf(`{"access_token":"statictoken","refresh_token":"%s"}`, refreshToken2)),
},
},
{
Request: testutil.Request{
Method: http.MethodPost,
Route: "/token",
Body: []byte(fmt.Sprintf("client_id=registry-client&grant_type=refresh_token&refresh_token=%s&scope=%s&service=%s", refreshToken2, url.QueryEscape(scope2), service)),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"access_token":"badtoken","refresh_token":"%s"}`),
},
},
})
te, tc := testServer(tokenMap)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
validCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate, validCheck)
defer c()
challengeManager1 := challenge.NewSimpleManager()
versions, err := ping(challengeManager1, e+"/v2/", "x-api-version")
if err != nil {
t.Fatal(err)
}
if len(versions) != 1 {
t.Fatalf("Unexpected version count: %d, expected 1", len(versions))
}
if check := (APIVersion{Type: "registry", Version: "2.0"}); versions[0] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[0], check)
}
creds := &testCredentialStore{
refreshTokens: map[string]string{
service: refreshToken1,
},
}
transport1 := transport.NewTransport(nil, NewAuthorizer(challengeManager1, NewTokenHandler(nil, creds, repo1, "pull", "push")))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
// Try with refresh token setting
e2, c2 := testServerWithAuth(m, authenicate, validCheck)
defer c2()
challengeManager2 := challenge.NewSimpleManager()
versions, err = ping(challengeManager2, e2+"/v2/", "x-api-version")
if err != nil {
t.Fatal(err)
}
if len(versions) != 1 {
t.Fatalf("Unexpected version count: %d, expected 1", len(versions))
}
if check := (APIVersion{Type: "registry", Version: "2.0"}); versions[0] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[0], check)
}
transport2 := transport.NewTransport(nil, NewAuthorizer(challengeManager2, NewTokenHandler(nil, creds, repo2, "pull", "push")))
client2 := &http.Client{Transport: transport2}
req, _ = http.NewRequest(http.MethodGet, e2+"/v2/hello", nil)
resp, err = client2.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusUnauthorized)
}
if creds.refreshTokens[service] != refreshToken2 {
t.Fatalf("Refresh token not set after change")
}
// Try with bad token
e3, c3 := testServerWithAuth(m, authenicate, validCheck)
defer c3()
challengeManager3 := challenge.NewSimpleManager()
versions, err = ping(challengeManager3, e3+"/v2/", "x-api-version")
if err != nil {
t.Fatal(err)
}
if check := (APIVersion{Type: "registry", Version: "2.0"}); versions[0] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[0], check)
}
transport3 := transport.NewTransport(nil, NewAuthorizer(challengeManager3, NewTokenHandler(nil, creds, repo2, "pull", "push")))
client3 := &http.Client{Transport: transport3}
req, _ = http.NewRequest(http.MethodGet, e3+"/v2/hello", nil)
resp, err = client3.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusUnauthorized {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusUnauthorized)
}
}
func TestEndpointAuthorizeV2RefreshToken(t *testing.T) {
service := "localhost.localdomain"
scope1 := "registry:catalog:search"
refreshToken1 := "0123456790abcdef"
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodPost,
Route: "/token",
Body: []byte(fmt.Sprintf("client_id=registry-client&grant_type=refresh_token&refresh_token=%s&scope=%s&service=%s", refreshToken1, url.QueryEscape(scope1), service)),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(fmt.Sprintf(`{"access_token":"statictoken","refresh_token":"%s"}`, refreshToken1)),
},
},
})
te, tc := testServer(tokenMap)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v1/search",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
validCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate, validCheck)
defer c()
challengeManager1 := challenge.NewSimpleManager()
versions, err := ping(challengeManager1, e+"/v2/", "x-api-version")
if err != nil {
t.Fatal(err)
}
if len(versions) != 1 {
t.Fatalf("Unexpected version count: %d, expected 1", len(versions))
}
if check := (APIVersion{Type: "registry", Version: "2.0"}); versions[0] != check {
t.Fatalf("Unexpected api version: %#v, expected %#v", versions[0], check)
}
tho := TokenHandlerOptions{
Credentials: &testCredentialStore{
refreshTokens: map[string]string{
service: refreshToken1,
},
},
Scopes: []Scope{
RegistryScope{
Name: "catalog",
Actions: []string{"search"},
},
},
}
transport1 := transport.NewTransport(nil, NewAuthorizer(challengeManager1, NewTokenHandlerWithOptions(tho)))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest(http.MethodGet, e+"/v1/search", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
}
func basicAuth(username, password string) string {
auth := username + ":" + password
return base64.StdEncoding.EncodeToString([]byte(auth))
}
func TestEndpointAuthorizeTokenBasic(t *testing.T) {
service := "localhost.localdomain"
repo := "some/fun/registry"
scope := fmt.Sprintf("repository:%s:pull,push", repo)
username := "tokenuser"
password := "superSecretPa$$word"
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: fmt.Sprintf("/token?account=%s&scope=%s&service=%s", username, url.QueryEscape(scope), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"access_token":"statictoken"}`),
},
},
})
authenicate1 := "Basic realm=localhost"
basicCheck := func(a string) bool {
return a == fmt.Sprintf("Basic %s", basicAuth(username, password))
}
te, tc := testServerWithAuth(tokenMap, authenicate1, basicCheck)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate2 := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
bearerCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate2, bearerCheck)
defer c()
creds := &testCredentialStore{
username: username,
password: password,
}
challengeManager := challenge.NewSimpleManager()
_, err := ping(challengeManager, e+"/v2/", "")
if err != nil {
t.Fatal(err)
}
transport1 := transport.NewTransport(nil, NewAuthorizer(challengeManager, NewTokenHandler(nil, creds, repo, "pull", "push"), NewBasicHandler(creds)))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
}
func TestEndpointAuthorizeTokenBasicWithExpiresIn(t *testing.T) {
service := "localhost.localdomain"
repo := "some/fun/registry"
scope := fmt.Sprintf("repository:%s:pull,push", repo)
username := "tokenuser"
password := "superSecretPa$$word"
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: fmt.Sprintf("/token?account=%s&scope=%s&service=%s", username, url.QueryEscape(scope), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"token":"statictoken", "expires_in": 3001}`),
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: fmt.Sprintf("/token?account=%s&scope=%s&service=%s", username, url.QueryEscape(scope), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"access_token":"statictoken", "expires_in": 3001}`),
},
},
})
authenicate1 := "Basic realm=localhost"
tokenExchanges := 0
basicCheck := func(a string) bool {
tokenExchanges = tokenExchanges + 1
return a == fmt.Sprintf("Basic %s", basicAuth(username, password))
}
te, tc := testServerWithAuth(tokenMap, authenicate1, basicCheck)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate2 := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
bearerCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate2, bearerCheck)
defer c()
creds := &testCredentialStore{
username: username,
password: password,
}
challengeManager := challenge.NewSimpleManager()
_, err := ping(challengeManager, e+"/v2/", "")
if err != nil {
t.Fatal(err)
}
clock := &fakeClock{current: time.Now()}
options := TokenHandlerOptions{
Transport: nil,
Credentials: creds,
Scopes: []Scope{
RepositoryScope{
Repository: repo,
Actions: []string{"pull", "push"},
},
},
}
tHandler := NewTokenHandlerWithOptions(options)
tHandler.(*tokenHandler).clock = clock
transport1 := transport.NewTransport(nil, NewAuthorizer(challengeManager, tHandler, NewBasicHandler(creds)))
client := &http.Client{Transport: transport1}
// First call should result in a token exchange
// Subsequent calls should recycle the token from the first request, until the expiration has lapsed.
timeIncrement := 1000 * time.Second
for i := 0; i < 4; i++ {
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
if tokenExchanges != 1 {
t.Fatalf("Unexpected number of token exchanges, want: 1, got %d (iteration: %d)", tokenExchanges, i)
}
clock.current = clock.current.Add(timeIncrement)
}
// After we've exceeded the expiration, we should see a second token exchange.
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
if tokenExchanges != 2 {
t.Fatalf("Unexpected number of token exchanges, want: 2, got %d", tokenExchanges)
}
}
func TestEndpointAuthorizeTokenBasicWithExpiresInAndIssuedAt(t *testing.T) {
service := "localhost.localdomain"
repo := "some/fun/registry"
scope := fmt.Sprintf("repository:%s:pull,push", repo)
username := "tokenuser"
password := "superSecretPa$$word"
// This test sets things up such that the token was issued one increment
// earlier than its sibling in TestEndpointAuthorizeTokenBasicWithExpiresIn.
// This will mean that the token expires after 3 increments instead of 4.
clock := &fakeClock{current: time.Now()}
timeIncrement := 1000 * time.Second
firstIssuedAt := clock.Now()
clock.current = clock.current.Add(timeIncrement)
secondIssuedAt := clock.current.Add(2 * timeIncrement)
tokenMap := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: fmt.Sprintf("/token?account=%s&scope=%s&service=%s", username, url.QueryEscape(scope), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"token":"statictoken", "issued_at": "` + firstIssuedAt.Format(time.RFC3339Nano) + `", "expires_in": 3001}`),
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: fmt.Sprintf("/token?account=%s&scope=%s&service=%s", username, url.QueryEscape(scope), service),
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Body: []byte(`{"access_token":"statictoken", "issued_at": "` + secondIssuedAt.Format(time.RFC3339Nano) + `", "expires_in": 3001}`),
},
},
})
authenicate1 := "Basic realm=localhost"
tokenExchanges := 0
basicCheck := func(a string) bool {
tokenExchanges = tokenExchanges + 1
return a == fmt.Sprintf("Basic %s", basicAuth(username, password))
}
te, tc := testServerWithAuth(tokenMap, authenicate1, basicCheck)
defer tc()
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
authenicate2 := fmt.Sprintf("Bearer realm=%q,service=%q", te+"/token", service)
bearerCheck := func(a string) bool {
return a == "Bearer statictoken"
}
e, c := testServerWithAuth(m, authenicate2, bearerCheck)
defer c()
creds := &testCredentialStore{
username: username,
password: password,
}
challengeManager := challenge.NewSimpleManager()
_, err := ping(challengeManager, e+"/v2/", "")
if err != nil {
t.Fatal(err)
}
options := TokenHandlerOptions{
Transport: nil,
Credentials: creds,
Scopes: []Scope{
RepositoryScope{
Repository: repo,
Actions: []string{"pull", "push"},
},
},
}
tHandler := NewTokenHandlerWithOptions(options)
tHandler.(*tokenHandler).clock = clock
transport1 := transport.NewTransport(nil, NewAuthorizer(challengeManager, tHandler, NewBasicHandler(creds)))
client := &http.Client{Transport: transport1}
// First call should result in a token exchange
// Subsequent calls should recycle the token from the first request, until the expiration has lapsed.
// We shaved one increment off of the equivalent logic in TestEndpointAuthorizeTokenBasicWithExpiresIn
// so this loop should have one fewer iteration.
for i := 0; i < 3; i++ {
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
if tokenExchanges != 1 {
t.Fatalf("Unexpected number of token exchanges, want: 1, got %d (iteration: %d)", tokenExchanges, i)
}
clock.current = clock.current.Add(timeIncrement)
}
// After we've exceeded the expiration, we should see a second token exchange.
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
if tokenExchanges != 2 {
t.Fatalf("Unexpected number of token exchanges, want: 2, got %d", tokenExchanges)
}
}
func TestEndpointAuthorizeBasic(t *testing.T) {
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/hello",
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
},
},
})
username := "user1"
password := "funSecretPa$$word"
authenicate := "Basic realm=localhost"
validCheck := func(a string) bool {
return a == fmt.Sprintf("Basic %s", basicAuth(username, password))
}
e, c := testServerWithAuth(m, authenicate, validCheck)
defer c()
creds := &testCredentialStore{
username: username,
password: password,
}
challengeManager := challenge.NewSimpleManager()
_, err := ping(challengeManager, e+"/v2/", "")
if err != nil {
t.Fatal(err)
}
transport1 := transport.NewTransport(nil, NewAuthorizer(challengeManager, NewBasicHandler(creds)))
client := &http.Client{Transport: transport1}
req, _ := http.NewRequest(http.MethodGet, e+"/v2/hello", nil)
resp, err := client.Do(req)
if err != nil {
t.Fatalf("Error sending get request: %s", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
t.Fatalf("Unexpected status code: %d, expected %d", resp.StatusCode, http.StatusAccepted)
}
}

View File

@@ -0,0 +1,167 @@
package client
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/distribution/distribution/v3"
)
type httpBlobUpload struct {
ctx context.Context
statter distribution.BlobStatter
client *http.Client
uuid string
startedAt time.Time
location string // always the last value of the location header.
offset int64
closed bool
}
func (hbu *httpBlobUpload) Reader() (io.ReadCloser, error) {
panic("Not implemented")
}
func (hbu *httpBlobUpload) handleErrorResponse(resp *http.Response) error {
if resp.StatusCode == http.StatusNotFound {
return distribution.ErrBlobUploadUnknown
}
return HandleHTTPResponseError(resp)
}
func (hbu *httpBlobUpload) ReadFrom(r io.Reader) (n int64, err error) {
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, io.NopCloser(r))
if err != nil {
return 0, err
}
defer req.Body.Close()
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := hbu.client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if err := hbu.handleErrorResponse(resp); err != nil {
return 0, err
}
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
if err != nil {
return 0, err
}
rng := resp.Header.Get("Range")
var start, end int64
if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
return 0, err
} else if n != 2 || end < start {
return 0, fmt.Errorf("bad range format: %s", rng)
}
hbu.offset += end - start + 1
return (end - start + 1), nil
}
func (hbu *httpBlobUpload) Write(p []byte) (n int, err error) {
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPatch, hbu.location, bytes.NewReader(p))
if err != nil {
return 0, err
}
req.Header.Set("Content-Range", fmt.Sprintf("%d-%d", hbu.offset, hbu.offset+int64(len(p)-1)))
req.Header.Set("Content-Length", fmt.Sprintf("%d", len(p)))
req.Header.Set("Content-Type", "application/octet-stream")
resp, err := hbu.client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if err := hbu.handleErrorResponse(resp); err != nil {
return 0, err
}
hbu.uuid = resp.Header.Get("Docker-Upload-UUID")
hbu.location, err = sanitizeLocation(resp.Header.Get("Location"), hbu.location)
if err != nil {
return 0, err
}
rng := resp.Header.Get("Range")
var start, end int
if n, err := fmt.Sscanf(rng, "%d-%d", &start, &end); err != nil {
return 0, err
} else if n != 2 || end < start {
return 0, fmt.Errorf("bad range format: %s", rng)
}
hbu.offset += int64(end - start + 1)
return (end - start + 1), nil
}
func (hbu *httpBlobUpload) Size() int64 {
return hbu.offset
}
func (hbu *httpBlobUpload) ID() string {
return hbu.uuid
}
func (hbu *httpBlobUpload) StartedAt() time.Time {
return hbu.startedAt
}
func (hbu *httpBlobUpload) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) {
// TODO(dmcgowan): Check if already finished, if so just fetch
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPut, hbu.location, nil)
if err != nil {
return distribution.Descriptor{}, err
}
values := req.URL.Query()
values.Set("digest", desc.Digest.String())
req.URL.RawQuery = values.Encode()
resp, err := hbu.client.Do(req)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
if err := hbu.handleErrorResponse(resp); err != nil {
return distribution.Descriptor{}, err
}
return hbu.statter.Stat(ctx, desc.Digest)
}
func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodDelete, hbu.location, nil)
if err != nil {
return err
}
resp, err := hbu.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return nil
}
return hbu.handleErrorResponse(resp)
}
func (hbu *httpBlobUpload) Close() error {
hbu.closed = true
return nil
}

View File

@@ -0,0 +1,502 @@
package client
import (
"bytes"
"context"
"fmt"
"net/http"
"testing"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/registry/api/errcode"
"github.com/distribution/distribution/v3/testutil"
)
// Test implements distribution.BlobWriter
var _ distribution.BlobWriter = &httpBlobUpload{}
func TestUploadReadFrom(t *testing.T) {
_, b := newRandomBlob(64)
repo := "test/upload/readfrom"
locationPath := fmt.Sprintf("/v2/%s/uploads/testid", repo)
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/",
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Docker-Distribution-API-Version": {"registry/2.0"},
}),
},
},
// Test Valid case
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {"0-63"},
}),
},
},
// Test invalid range
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {""},
}),
},
},
// Test 404
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusNotFound,
},
},
// Test 400 valid json
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusBadRequest,
Headers: http.Header{"Content-Type": []string{"application/json; charset=utf-8"}},
Body: []byte(`
{ "errors":
[
{
"code": "BLOB_UPLOAD_INVALID",
"message": "blob upload invalid",
"detail": "more detail"
}
]
} `),
},
},
// Test 400 invalid json
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusBadRequest,
Headers: http.Header{"Content-Type": []string{"application/json"}},
Body: []byte("something bad happened"),
},
},
// Test 500
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusInternalServerError,
},
},
})
e, c := testServer(m)
defer c()
blobUpload := &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
}
// Valid case
blobUpload.location = e + locationPath
n, err := blobUpload.ReadFrom(bytes.NewReader(b))
if err != nil {
t.Fatalf("Error calling ReadFrom: %s", err)
}
if n != 64 {
t.Fatalf("Wrong length returned from ReadFrom: %d, expected 64", n)
}
// Bad range
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when bad range received")
}
// 404
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if err != distribution.ErrBlobUploadUnknown {
t.Fatalf("Wrong error thrown: %s, expected %s", err, distribution.ErrBlobUploadUnknown)
}
// 400 valid json
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(errcode.Errors); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else if len(uploadErr) != 1 {
t.Fatalf("Unexpected number of errors: %d, expected 1", len(uploadErr))
} else {
v2Err, ok := uploadErr[0].(errcode.Error)
if !ok {
t.Fatalf("Not an 'Error' type: %#v", uploadErr[0])
}
if v2Err.Code != errcode.ErrorCodeBlobUploadInvalid {
t.Fatalf("Unexpected error code: %s, expected %d", v2Err.Code.String(), errcode.ErrorCodeBlobUploadInvalid)
}
if expected := "blob upload invalid"; v2Err.Message != expected {
t.Fatalf("Unexpected error message: %q, expected %q", v2Err.Message, expected)
}
if expected := "more detail"; v2Err.Detail.(string) != expected {
t.Fatalf("Unexpected error message: %q, expected %q", v2Err.Detail.(string), expected)
}
}
// 400 invalid json
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(*UnexpectedHTTPResponseError); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else {
respStr := string(uploadErr.Response)
if expected := "something bad happened"; respStr != expected {
t.Fatalf("Unexpected response string: %s, expected: %s", respStr, expected)
}
}
// 500
blobUpload.location = e + locationPath
_, err = blobUpload.ReadFrom(bytes.NewReader(b))
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(*UnexpectedHTTPStatusError); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else if expected := "500 " + http.StatusText(http.StatusInternalServerError); uploadErr.Status != expected {
t.Fatalf("Unexpected response status: %s, expected %s", uploadErr.Status, expected)
}
}
func TestUploadSize(t *testing.T) {
_, b := newRandomBlob(64)
readFromLocationPath := "/v2/test/upload/readfrom/uploads/testid"
writeLocationPath := "/v2/test/upload/readfrom/uploads/testid"
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/",
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Docker-Distribution-API-Version": {"registry/2.0"},
}),
},
},
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: readFromLocationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {readFromLocationPath},
"Range": {"0-63"},
}),
},
},
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: writeLocationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {writeLocationPath},
"Range": {"0-63"},
}),
},
},
})
e, c := testServer(m)
defer c()
// Writing with ReadFrom
blobUpload := &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
location: e + readFromLocationPath,
}
if blobUpload.Size() != 0 {
t.Fatalf("Wrong size returned from Size: %d, expected 0", blobUpload.Size())
}
_, err := blobUpload.ReadFrom(bytes.NewReader(b))
if err != nil {
t.Fatalf("Error calling ReadFrom: %s", err)
}
if blobUpload.Size() != 64 {
t.Fatalf("Wrong size returned from Size: %d, expected 64", blobUpload.Size())
}
// Writing with Write
blobUpload = &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
location: e + writeLocationPath,
}
_, err = blobUpload.Write(b)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}
if blobUpload.Size() != 64 {
t.Fatalf("Wrong size returned from Size: %d, expected 64", blobUpload.Size())
}
}
func TestUploadWrite(t *testing.T) {
_, b := newRandomBlob(64)
repo := "test/upload/write"
locationPath := fmt.Sprintf("/v2/%s/uploads/testid", repo)
m := testutil.RequestResponseMap([]testutil.RequestResponseMapping{
{
Request: testutil.Request{
Method: http.MethodGet,
Route: "/v2/",
},
Response: testutil.Response{
StatusCode: http.StatusOK,
Headers: http.Header(map[string][]string{
"Docker-Distribution-API-Version": {"registry/2.0"},
}),
},
},
// Test Valid case
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {"0-63"},
}),
},
},
// Test invalid range
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusAccepted,
Headers: http.Header(map[string][]string{
"Docker-Upload-UUID": {"46603072-7a1b-4b41-98f9-fd8a7da89f9b"},
"Location": {locationPath},
"Range": {""},
}),
},
},
// Test 404
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusNotFound,
},
},
// Test 400 valid json
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusBadRequest,
Headers: http.Header{"Content-Type": []string{"application/json; charset=utf-8"}},
Body: []byte(`
{ "errors":
[
{
"code": "BLOB_UPLOAD_INVALID",
"message": "blob upload invalid",
"detail": "more detail"
}
]
} `),
},
},
// Test 400 invalid json
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusBadRequest,
Headers: http.Header{"Content-Type": []string{"application/json"}},
Body: []byte("something bad happened"),
},
},
// Test 500
{
Request: testutil.Request{
Method: http.MethodPatch,
Route: locationPath,
Body: b,
},
Response: testutil.Response{
StatusCode: http.StatusInternalServerError,
},
},
})
e, c := testServer(m)
defer c()
blobUpload := &httpBlobUpload{
ctx: context.Background(),
client: &http.Client{},
}
// Valid case
blobUpload.location = e + locationPath
n, err := blobUpload.Write(b)
if err != nil {
t.Fatalf("Error calling Write: %s", err)
}
if n != 64 {
t.Fatalf("Wrong length returned from Write: %d, expected 64", n)
}
// Bad range
blobUpload.location = e + locationPath
_, err = blobUpload.Write(b)
if err == nil {
t.Fatalf("Expected error when bad range received")
}
// 404
blobUpload.location = e + locationPath
_, err = blobUpload.Write(b)
if err == nil {
t.Fatalf("Expected error when not found")
}
if err != distribution.ErrBlobUploadUnknown {
t.Fatalf("Wrong error thrown: %s, expected %s", err, distribution.ErrBlobUploadUnknown)
}
// 400 valid json
blobUpload.location = e + locationPath
_, err = blobUpload.Write(b)
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(errcode.Errors); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else if len(uploadErr) != 1 {
t.Fatalf("Unexpected number of errors: %d, expected 1", len(uploadErr))
} else {
v2Err, ok := uploadErr[0].(errcode.Error)
if !ok {
t.Fatalf("Not an 'Error' type: %#v", uploadErr[0])
}
if v2Err.Code != errcode.ErrorCodeBlobUploadInvalid {
t.Fatalf("Unexpected error code: %s, expected %d", v2Err.Code.String(), errcode.ErrorCodeBlobUploadInvalid)
}
if expected := "blob upload invalid"; v2Err.Message != expected {
t.Fatalf("Unexpected error message: %q, expected %q", v2Err.Message, expected)
}
if expected := "more detail"; v2Err.Detail.(string) != expected {
t.Fatalf("Unexpected error message: %q, expected %q", v2Err.Detail.(string), expected)
}
}
// 400 invalid json
blobUpload.location = e + locationPath
_, err = blobUpload.Write(b)
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(*UnexpectedHTTPResponseError); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else {
respStr := string(uploadErr.Response)
if expected := "something bad happened"; respStr != expected {
t.Fatalf("Unexpected response string: %s, expected: %s", respStr, expected)
}
}
// 500
blobUpload.location = e + locationPath
_, err = blobUpload.Write(b)
if err == nil {
t.Fatalf("Expected error when not found")
}
if uploadErr, ok := err.(*UnexpectedHTTPStatusError); !ok {
t.Fatalf("Wrong error type %T: %s", err, err)
} else if expected := "500 " + http.StatusText(http.StatusInternalServerError); uploadErr.Status != expected {
t.Fatalf("Unexpected response status: %s, expected %s", uploadErr.Status, expected)
}
}

180
internal/client/errors.go Normal file
View File

@@ -0,0 +1,180 @@
package client
import (
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"net/http"
"github.com/distribution/distribution/v3/internal/client/auth/challenge"
"github.com/distribution/distribution/v3/registry/api/errcode"
)
// ErrNoErrorsInBody is returned when an HTTP response body parses to an empty
// errcode.Errors slice.
var ErrNoErrorsInBody = errors.New("no error details found in HTTP response body")
// UnexpectedHTTPStatusError is returned when an unexpected HTTP status is
// returned when making a registry api call.
type UnexpectedHTTPStatusError struct {
Status string
}
func (e *UnexpectedHTTPStatusError) Error() string {
return fmt.Sprintf("received unexpected HTTP status: %s", e.Status)
}
// UnexpectedHTTPResponseError is returned when an expected HTTP status code
// is returned, but the content was unexpected and failed to be parsed.
type UnexpectedHTTPResponseError struct {
ParseErr error
StatusCode int
Response []byte
}
func (e *UnexpectedHTTPResponseError) Error() string {
return fmt.Sprintf("error parsing HTTP %d response body: %s: %q", e.StatusCode, e.ParseErr.Error(), string(e.Response))
}
func parseHTTPErrorResponse(resp *http.Response) error {
var errors errcode.Errors
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
statusCode := resp.StatusCode
ctHeader := resp.Header.Get("Content-Type")
if ctHeader == "" {
return makeError(statusCode, string(body))
}
contentType, _, err := mime.ParseMediaType(ctHeader)
if err != nil {
return fmt.Errorf("failed parsing content-type: %w", err)
}
if contentType != "application/json" && contentType != "application/vnd.api+json" {
return makeError(statusCode, string(body))
}
// For backward compatibility, handle irregularly formatted
// messages that contain a "details" field.
var detailsErr struct {
Details string `json:"details"`
}
err = json.Unmarshal(body, &detailsErr)
if err == nil && detailsErr.Details != "" {
return makeError(statusCode, detailsErr.Details)
}
if err := json.Unmarshal(body, &errors); err != nil {
return &UnexpectedHTTPResponseError{
ParseErr: err,
StatusCode: statusCode,
Response: body,
}
}
if len(errors) == 0 {
// If there was no error specified in the body, return
// UnexpectedHTTPResponseError.
return &UnexpectedHTTPResponseError{
ParseErr: ErrNoErrorsInBody,
StatusCode: statusCode,
Response: body,
}
}
return errors
}
func makeError(statusCode int, details string) error {
switch statusCode {
case http.StatusUnauthorized:
return errcode.ErrorCodeUnauthorized.WithMessage(details)
case http.StatusForbidden:
return errcode.ErrorCodeDenied.WithMessage(details)
case http.StatusTooManyRequests:
return errcode.ErrorCodeTooManyRequests.WithMessage(details)
default:
return errcode.ErrorCodeUnknown.WithMessage(details)
}
}
func makeErrorList(err error) []error {
if errL, ok := err.(errcode.Errors); ok {
return []error(errL)
}
return []error{err}
}
func mergeErrors(err1, err2 error) error {
return errcode.Errors(append(makeErrorList(err1), makeErrorList(err2)...))
}
// HandleHTTPResponseError returns error parsed from HTTP response, if any.
// It returns nil if no error occurred (HTTP status 200-399), or an error
// for unsuccessful HTTP response codes (in the range 400 - 499 inclusive).
// If possible, it returns a typed error, but an UnexpectedHTTPStatusError
// is returned for response code outside the expected range (HTTP status < 200
// and > 500).
func HandleHTTPResponseError(resp *http.Response) error {
if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
return nil
}
if resp.StatusCode >= 400 && resp.StatusCode < 500 {
// Check for OAuth errors within the `WWW-Authenticate` header first
// See https://tools.ietf.org/html/rfc6750#section-3
for _, c := range challenge.ResponseChallenges(resp) {
if c.Scheme == "bearer" {
var err errcode.Error
// codes defined at https://tools.ietf.org/html/rfc6750#section-3.1
switch c.Parameters["error"] {
case "invalid_token":
err.Code = errcode.ErrorCodeUnauthorized
case "insufficient_scope":
err.Code = errcode.ErrorCodeDenied
default:
continue
}
if description := c.Parameters["error_description"]; description != "" {
err.Message = description
} else {
err.Message = err.Code.Message()
}
return mergeErrors(err, parseHTTPErrorResponse(resp))
}
}
err := parseHTTPErrorResponse(resp)
if uErr, ok := err.(*UnexpectedHTTPResponseError); ok && resp.StatusCode == 401 {
return errcode.ErrorCodeUnauthorized.WithDetail(uErr.Response)
}
return err
}
return &UnexpectedHTTPStatusError{Status: resp.Status}
}
// HandleErrorResponse returns error parsed from HTTP response for an
// unsuccessful HTTP response code (in the range 400 - 499 inclusive). An
// UnexpectedHTTPStatusError returned for response code outside of expected
// range.
//
// Deprecated: use [HandleHTTPResponseError] and check the error.
func HandleErrorResponse(resp *http.Response) error {
if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
return &UnexpectedHTTPStatusError{Status: resp.Status}
}
return HandleHTTPResponseError(resp)
}
// SuccessStatus returns true if the argument is a successful HTTP response
// code (in the range 200 - 399 inclusive).
//
// Deprecated: use [HandleHTTPResponseError] and check the error.
func SuccessStatus(status int) bool {
return status >= 200 && status <= 399
}

View File

@@ -0,0 +1,151 @@
package client
import (
"bytes"
"io"
"net/http"
"strings"
"testing"
)
type nopCloser struct {
io.Reader
}
func (nopCloser) Close() error { return nil }
func TestHandleHTTPResponseError200ValidBody(t *testing.T) {
response := &http.Response{
Status: "200 OK",
StatusCode: 200,
}
err := HandleHTTPResponseError(response)
if err != nil {
t.Errorf("Expected no error, got: %v", err)
}
}
func TestHandleHTTPResponseError401ValidBody(t *testing.T) {
json := `{"errors":[{"code":"UNAUTHORIZED","message":"action requires authentication"}]}`
response := &http.Response{
Status: "401 Unauthorized",
StatusCode: 401,
Body: nopCloser{bytes.NewBufferString(json)},
Header: http.Header{"Content-Type": []string{"application/json; charset=utf-8"}},
}
err := HandleHTTPResponseError(response)
expectedMsg := "unauthorized: action requires authentication"
if !strings.Contains(err.Error(), expectedMsg) {
t.Errorf("Expected %q, got: %q", expectedMsg, err.Error())
}
}
func TestHandleHTTPResponseError401WithInvalidBody(t *testing.T) {
json := "{invalid json}"
response := &http.Response{
Status: "401 Unauthorized",
StatusCode: 401,
Body: nopCloser{bytes.NewBufferString(json)},
Header: http.Header{"Content-Type": []string{"application/json; charset=utf-8"}},
}
err := HandleHTTPResponseError(response)
expectedMsg := "unauthorized: authentication required"
if !strings.Contains(err.Error(), expectedMsg) {
t.Errorf("Expected %q, got: %q", expectedMsg, err.Error())
}
}
func TestHandleHTTPResponseErrorExpectedStatusCode400ValidBody(t *testing.T) {
json := `{"errors":[{"code":"DIGEST_INVALID","message":"provided digest does not match"}]}`
response := &http.Response{
Status: "400 Bad Request",
StatusCode: 400,
Body: nopCloser{bytes.NewBufferString(json)},
Header: http.Header{"Content-Type": []string{"application/json"}},
}
err := HandleHTTPResponseError(response)
expectedMsg := "digest invalid: provided digest does not match"
if !strings.Contains(err.Error(), expectedMsg) {
t.Errorf("Expected %q, got: %q", expectedMsg, err.Error())
}
}
func TestHandleHTTPResponseErrorExpectedStatusCode404EmptyErrorSlice(t *testing.T) {
json := `{"randomkey": "randomvalue"}`
response := &http.Response{
Status: "404 Not Found",
StatusCode: 404,
Body: nopCloser{bytes.NewBufferString(json)},
Header: http.Header{"Content-Type": []string{"application/json; charset=utf-8"}},
}
err := HandleHTTPResponseError(response)
expectedMsg := `error parsing HTTP 404 response body: no error details found in HTTP response body: "{\"randomkey\": \"randomvalue\"}"`
if !strings.Contains(err.Error(), expectedMsg) {
t.Errorf("Expected %q, got: %q", expectedMsg, err.Error())
}
}
func TestHandleHTTPResponseErrorExpectedStatusCode404InvalidBody(t *testing.T) {
json := "{invalid json}"
response := &http.Response{
Status: "404 Not Found",
StatusCode: 404,
Body: nopCloser{bytes.NewBufferString(json)},
Header: http.Header{"Content-Type": []string{"application/json"}},
}
err := HandleHTTPResponseError(response)
expectedMsg := "error parsing HTTP 404 response body: invalid character 'i' looking for beginning of object key string: \"{invalid json}\""
if !strings.Contains(err.Error(), expectedMsg) {
t.Errorf("Expected %q, got: %q", expectedMsg, err.Error())
}
}
func TestHandleHTTPResponseErrorUnexpectedStatusCode501(t *testing.T) {
response := &http.Response{
Status: "501 Not Implemented",
StatusCode: 501,
Body: nopCloser{bytes.NewBufferString("{\"Error Encountered\" : \"Function not implemented.\"}")},
Header: http.Header{"Content-Type": []string{"application/json"}},
}
err := HandleHTTPResponseError(response)
expectedMsg := "received unexpected HTTP status: 501 Not Implemented"
if !strings.Contains(err.Error(), expectedMsg) {
t.Errorf("Expected %q, got: %q", expectedMsg, err.Error())
}
}
func TestHandleHTTPResponseErrorInsufficientPrivileges403(t *testing.T) {
json := `{"details":"requesting higher privileges than access token allows"}`
response := &http.Response{
Status: "403 Forbidden",
StatusCode: 403,
Body: nopCloser{bytes.NewBufferString(json)},
Header: http.Header{"Content-Type": []string{"application/json"}},
}
err := HandleHTTPResponseError(response)
expectedMsg := "denied: requesting higher privileges than access token allows"
if !strings.Contains(err.Error(), expectedMsg) {
t.Errorf("Expected %q, got: %q", expectedMsg, err.Error())
}
}
func TestHandleHTTPResponseErrorNonJson(t *testing.T) {
msg := `{"details":"requesting higher privileges than access token allows"}`
response := &http.Response{
Status: "403 Forbidden",
StatusCode: 403,
Body: nopCloser{bytes.NewBufferString(msg)},
}
err := HandleHTTPResponseError(response)
if !strings.Contains(err.Error(), msg) {
t.Errorf("Expected %q, got: %q", msg, err.Error())
}
}

View File

@@ -0,0 +1,936 @@
package client
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/internal/client/transport"
v2 "github.com/distribution/distribution/v3/registry/api/v2"
"github.com/distribution/distribution/v3/registry/storage/cache"
"github.com/distribution/distribution/v3/registry/storage/cache/memory"
"github.com/distribution/reference"
"github.com/opencontainers/go-digest"
)
// Registry provides an interface for calling Repositories, which returns a catalog of repositories.
type Registry interface {
Repositories(ctx context.Context, repos []string, last string) (n int, err error)
}
// checkHTTPRedirect is a callback that can manipulate redirected HTTP
// requests. It is used to preserve Accept and Range headers.
func checkHTTPRedirect(req *http.Request, via []*http.Request) error {
if len(via) >= 10 {
return errors.New("stopped after 10 redirects")
}
if len(via) > 0 {
for headerName, headerVals := range via[0].Header {
if headerName != "Accept" && headerName != "Range" {
continue
}
for _, val := range headerVals {
// Don't add to redirected request if redirected
// request already has a header with the same
// name and value.
hasValue := false
for _, existingVal := range req.Header[headerName] {
if existingVal == val {
hasValue = true
break
}
}
if !hasValue {
req.Header.Add(headerName, val)
}
}
}
}
return nil
}
// NewRegistry creates a registry namespace which can be used to get a listing of repositories
func NewRegistry(baseURL string, transport http.RoundTripper) (Registry, error) {
ub, err := v2.NewURLBuilderFromString(baseURL, false)
if err != nil {
return nil, err
}
client := &http.Client{
Transport: transport,
Timeout: 1 * time.Minute,
CheckRedirect: checkHTTPRedirect,
}
return &registry{
client: client,
ub: ub,
}, nil
}
type registry struct {
client *http.Client
ub *v2.URLBuilder
}
// Repositories returns a lexigraphically sorted catalog given a base URL. The 'entries' slice will be filled up to the size
// of the slice, starting at the value provided in 'last'. The number of entries will be returned along with io.EOF if there
// are no more entries
func (r *registry) Repositories(ctx context.Context, entries []string, last string) (int, error) {
values := buildCatalogValues(len(entries), last)
u, err := r.ub.BuildCatalogURL(values)
if err != nil {
return 0, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return 0, err
}
resp, err := r.client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if err := HandleHTTPResponseError(resp); err != nil {
return 0, err
}
var ctlg struct {
Repositories []string `json:"repositories"`
}
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&ctlg); err != nil {
return 0, err
}
copy(entries, ctlg.Repositories)
numFilled := len(ctlg.Repositories)
if resp.Header.Get("Link") == "" {
return numFilled, io.EOF
}
return numFilled, nil
}
// NewRepository creates a new Repository for the given repository name and base URL.
func NewRepository(name reference.Named, baseURL string, transport http.RoundTripper) (distribution.Repository, error) {
ub, err := v2.NewURLBuilderFromString(baseURL, false)
if err != nil {
return nil, err
}
return &repository{
client: &http.Client{
Transport: transport,
CheckRedirect: checkHTTPRedirect,
// TODO(dmcgowan): create cookie jar
},
ub: ub,
name: name,
}, nil
}
type repository struct {
client *http.Client
ub *v2.URLBuilder
name reference.Named
}
func (r *repository) Named() reference.Named {
return r.name
}
func (r *repository) Blobs(ctx context.Context) distribution.BlobStore {
return &blobs{
name: r.name,
ub: r.ub,
client: r.client,
statter: cache.NewCachedBlobStatter(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize), &blobStatter{
name: r.name,
ub: r.ub,
client: r.client,
}),
}
}
func (r *repository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) {
// todo(richardscothern): options should be sent over the wire
return &manifests{
name: r.name,
ub: r.ub,
client: r.client,
etags: make(map[string]string),
}, nil
}
func (r *repository) Tags(ctx context.Context) distribution.TagService {
return &tags{
client: r.client,
ub: r.ub,
name: r.Named(),
}
}
// tags implements remote tagging operations.
type tags struct {
client *http.Client
ub *v2.URLBuilder
name reference.Named
}
// All returns all tags
func (t *tags) All(ctx context.Context) ([]string, error) {
listURLStr, err := t.ub.BuildTagsURL(t.name)
if err != nil {
return nil, err
}
listURL, err := url.Parse(listURLStr)
if err != nil {
return nil, err
}
var allTags []string
for {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, listURL.String(), nil)
if err != nil {
return nil, err
}
resp, err := t.client.Do(req)
if err != nil {
return allTags, err
}
defer resp.Body.Close()
if err := HandleHTTPResponseError(resp); err != nil {
return allTags, err
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return allTags, err
}
tagsResponse := struct {
Tags []string `json:"tags"`
}{}
if err := json.Unmarshal(b, &tagsResponse); err != nil {
return allTags, err
}
allTags = append(allTags, tagsResponse.Tags...)
if link := resp.Header.Get("Link"); link != "" {
firsLink, _, _ := strings.Cut(link, ";")
linkURL, err := url.Parse(strings.Trim(firsLink, "<>"))
if err != nil {
return allTags, err
}
listURL = listURL.ResolveReference(linkURL)
} else {
return allTags, nil
}
}
}
func descriptorFromResponse(response *http.Response) (distribution.Descriptor, error) {
desc := distribution.Descriptor{}
headers := response.Header
ctHeader := headers.Get("Content-Type")
if ctHeader == "" {
return distribution.Descriptor{}, errors.New("missing or empty Content-Type header")
}
desc.MediaType = ctHeader
digestHeader := headers.Get("Docker-Content-Digest")
if digestHeader == "" {
data, err := io.ReadAll(response.Body)
if err != nil {
return distribution.Descriptor{}, err
}
_, desc, err := distribution.UnmarshalManifest(ctHeader, data)
if err != nil {
return distribution.Descriptor{}, err
}
return desc, nil
}
dgst, err := digest.Parse(digestHeader)
if err != nil {
return distribution.Descriptor{}, err
}
desc.Digest = dgst
lengthHeader := headers.Get("Content-Length")
if lengthHeader == "" {
return distribution.Descriptor{}, errors.New("missing or empty Content-Length header")
}
length, err := strconv.ParseInt(lengthHeader, 10, 64)
if err != nil {
return distribution.Descriptor{}, err
}
desc.Size = length
return desc, nil
}
// Get issues a HEAD request for a Manifest against its named endpoint in order
// to construct a descriptor for the tag. If the registry doesn't support HEADing
// a manifest, fallback to GET.
func (t *tags) Get(ctx context.Context, tag string) (distribution.Descriptor, error) {
ref, err := reference.WithTag(t.name, tag)
if err != nil {
return distribution.Descriptor{}, err
}
u, err := t.ub.BuildManifestURL(ref)
if err != nil {
return distribution.Descriptor{}, err
}
newRequest := func(method string) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, u, nil)
if err != nil {
return nil, err
}
for _, t := range distribution.ManifestMediaTypes() {
req.Header.Add("Accept", t)
}
resp, err := t.client.Do(req)
return resp, err
}
resp, err := newRequest(http.MethodHead)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 400 && len(resp.Header.Get("Docker-Content-Digest")) > 0:
// if the response is a success AND a Docker-Content-Digest can be retrieved from the headers
return descriptorFromResponse(resp)
default:
// if the response is an error - there will be no body to decode.
// Issue a GET request:
// - for data from a server that does not handle HEAD
// - to get error details in case of a failure
resp, err = newRequest(http.MethodGet)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
if resp.StatusCode >= 200 && resp.StatusCode < 400 {
return descriptorFromResponse(resp)
}
return distribution.Descriptor{}, HandleHTTPResponseError(resp)
}
}
func (t *tags) Lookup(ctx context.Context, digest distribution.Descriptor) ([]string, error) {
panic("not implemented")
}
func (t *tags) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error {
panic("not implemented")
}
func (t *tags) Untag(ctx context.Context, tag string) error {
ref, err := reference.WithTag(t.name, tag)
if err != nil {
return err
}
u, err := t.ub.BuildManifestURL(ref)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u, nil)
if err != nil {
return err
}
resp, err := t.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return HandleHTTPResponseError(resp)
}
type manifests struct {
name reference.Named
ub *v2.URLBuilder
client *http.Client
etags map[string]string
}
func (ms *manifests) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
ref, err := reference.WithDigest(ms.name, dgst)
if err != nil {
return false, err
}
u, err := ms.ub.BuildManifestURL(ref)
if err != nil {
return false, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodHead, u, nil)
if err != nil {
return false, err
}
resp, err := ms.client.Do(req)
if err != nil {
return false, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return false, nil
}
if err := HandleHTTPResponseError(resp); err != nil {
return false, err
}
return true, nil
}
// AddEtagToTag allows a client to supply an eTag to Get which will be
// used for a conditional HTTP request. If the eTag matches, a nil manifest
// and ErrManifestNotModified error will be returned. etag is automatically
// quoted when added to this map.
func AddEtagToTag(tag, etag string) distribution.ManifestServiceOption {
return etagOption{tag, etag}
}
type etagOption struct{ tag, etag string }
func (o etagOption) Apply(ms distribution.ManifestService) error {
if ms, ok := ms.(*manifests); ok {
ms.etags[o.tag] = fmt.Sprintf(`"%s"`, o.etag)
return nil
}
return fmt.Errorf("etag options is a client-only option")
}
// ReturnContentDigest allows a client to set a the content digest on
// a successful request from the 'Docker-Content-Digest' header. This
// returned digest is represents the digest which the registry uses
// to refer to the content and can be used to delete the content.
func ReturnContentDigest(dgst *digest.Digest) distribution.ManifestServiceOption {
return contentDigestOption{dgst}
}
type contentDigestOption struct{ digest *digest.Digest }
func (o contentDigestOption) Apply(ms distribution.ManifestService) error {
return nil
}
func (ms *manifests) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) {
var (
digestOrTag string
ref reference.Named
err error
contentDgst *digest.Digest
mediaTypes []string
)
for _, option := range options {
switch opt := option.(type) {
case distribution.WithTagOption:
digestOrTag = opt.Tag
ref, err = reference.WithTag(ms.name, opt.Tag)
if err != nil {
return nil, err
}
case contentDigestOption:
contentDgst = opt.digest
case distribution.WithManifestMediaTypesOption:
mediaTypes = opt.MediaTypes
default:
err := option.Apply(ms)
if err != nil {
return nil, err
}
}
}
if digestOrTag == "" {
digestOrTag = dgst.String()
ref, err = reference.WithDigest(ms.name, dgst)
if err != nil {
return nil, err
}
}
if len(mediaTypes) == 0 {
mediaTypes = distribution.ManifestMediaTypes()
}
u, err := ms.ub.BuildManifestURL(ref)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, err
}
for _, t := range mediaTypes {
req.Header.Add("Accept", t)
}
if _, ok := ms.etags[digestOrTag]; ok {
req.Header.Set("If-None-Match", ms.etags[digestOrTag])
}
resp, err := ms.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotModified {
return nil, distribution.ErrManifestNotModified
}
if err := HandleHTTPResponseError(resp); err != nil {
return nil, err
}
if contentDgst != nil {
dgst, err := digest.Parse(resp.Header.Get("Docker-Content-Digest"))
if err == nil {
*contentDgst = dgst
}
}
mt := resp.Header.Get("Content-Type")
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
m, _, err := distribution.UnmarshalManifest(mt, body)
if err != nil {
return nil, err
}
return m, nil
}
// Put puts a manifest. A tag can be specified using an options parameter which uses some shared state to hold the
// tag name in order to build the correct upload URL.
func (ms *manifests) Put(ctx context.Context, m distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error) {
ref := ms.name
var tagged bool
for _, option := range options {
if opt, ok := option.(distribution.WithTagOption); ok {
var err error
ref, err = reference.WithTag(ref, opt.Tag)
if err != nil {
return "", err
}
tagged = true
} else {
err := option.Apply(ms)
if err != nil {
return "", err
}
}
}
mediaType, p, err := m.Payload()
if err != nil {
return "", err
}
if !tagged {
// generate a canonical digest and Put by digest
_, d, err := distribution.UnmarshalManifest(mediaType, p)
if err != nil {
return "", err
}
ref, err = reference.WithDigest(ref, d.Digest)
if err != nil {
return "", err
}
}
manifestURL, err := ms.ub.BuildManifestURL(ref)
if err != nil {
return "", err
}
putRequest, err := http.NewRequestWithContext(ctx, http.MethodPut, manifestURL, bytes.NewReader(p))
if err != nil {
return "", err
}
putRequest.Header.Set("Content-Type", mediaType)
resp, err := ms.client.Do(putRequest)
if err != nil {
return "", err
}
defer resp.Body.Close()
if err := HandleHTTPResponseError(resp); err != nil {
return "", err
}
dgst, err := digest.Parse(resp.Header.Get("Docker-Content-Digest"))
if err != nil {
return "", err
}
return dgst, nil
}
func (ms *manifests) Delete(ctx context.Context, dgst digest.Digest) error {
ref, err := reference.WithDigest(ms.name, dgst)
if err != nil {
return err
}
u, err := ms.ub.BuildManifestURL(ref)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, u, nil)
if err != nil {
return err
}
resp, err := ms.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return HandleHTTPResponseError(resp)
}
// todo(richardscothern): Restore interface and implementation with merge of #1050
/*func (ms *manifests) Enumerate(ctx context.Context, manifests []distribution.Manifest, last distribution.Manifest) (n int, err error) {
panic("not supported")
}*/
type blobs struct {
name reference.Named
ub *v2.URLBuilder
client *http.Client
statter distribution.BlobDescriptorService
distribution.BlobDeleter
}
func sanitizeLocation(location, base string) (string, error) {
baseURL, err := url.Parse(base)
if err != nil {
return "", err
}
locationURL, err := url.Parse(location)
if err != nil {
return "", err
}
return baseURL.ResolveReference(locationURL).String(), nil
}
func (bs *blobs) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
return bs.statter.Stat(ctx, dgst)
}
func (bs *blobs) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
reader, err := bs.Open(ctx, dgst)
if err != nil {
return nil, err
}
defer reader.Close()
return io.ReadAll(reader)
}
func (bs *blobs) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
ref, err := reference.WithDigest(bs.name, dgst)
if err != nil {
return nil, err
}
blobURL, err := bs.ub.BuildBlobURL(ref)
if err != nil {
return nil, err
}
return transport.NewHTTPReadSeeker(ctx, bs.client, blobURL, func(resp *http.Response) error {
if resp.StatusCode == http.StatusNotFound {
return distribution.ErrBlobUnknown
}
return HandleHTTPResponseError(resp)
}), nil
}
func (bs *blobs) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
desc, err := bs.statter.Stat(ctx, dgst)
if err != nil {
return err
}
w.Header().Set("Content-Length", strconv.FormatInt(desc.Size, 10))
w.Header().Set("Content-Type", desc.MediaType)
w.Header().Set("Docker-Content-Digest", dgst.String())
w.Header().Set("Etag", dgst.String())
if r.Method == http.MethodHead {
return nil
}
blob, err := bs.Open(ctx, dgst)
if err != nil {
return err
}
defer blob.Close()
_, err = io.CopyN(w, blob, desc.Size)
return err
}
func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
writer, err := bs.Create(ctx)
if err != nil {
return distribution.Descriptor{}, err
}
dgstr := digest.Canonical.Digester()
n, err := io.Copy(writer, io.TeeReader(bytes.NewReader(p), dgstr.Hash()))
if err != nil {
return distribution.Descriptor{}, err
}
if n < int64(len(p)) {
return distribution.Descriptor{}, fmt.Errorf("short copy: wrote %d of %d", n, len(p))
}
return writer.Commit(ctx, distribution.Descriptor{
MediaType: mediaType,
Size: int64(len(p)),
Digest: dgstr.Digest(),
})
}
type optionFunc func(interface{}) error
func (f optionFunc) Apply(v interface{}) error {
return f(v)
}
// WithMountFrom returns a BlobCreateOption which designates that the blob should be
// mounted from the given canonical reference.
func WithMountFrom(ref reference.Canonical) distribution.BlobCreateOption {
return optionFunc(func(v interface{}) error {
opts, ok := v.(*distribution.CreateOptions)
if !ok {
return fmt.Errorf("unexpected options type: %T", v)
}
opts.Mount.ShouldMount = true
opts.Mount.From = ref
return nil
})
}
func (bs *blobs) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) {
var opts distribution.CreateOptions
for _, option := range options {
err := option.Apply(&opts)
if err != nil {
return nil, err
}
}
var values []url.Values
if opts.Mount.ShouldMount {
values = append(values, url.Values{"from": {opts.Mount.From.Name()}, "mount": {opts.Mount.From.Digest().String()}})
}
u, err := bs.ub.BuildBlobUploadURL(bs.name, values...)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, nil)
if err != nil {
return nil, err
}
resp, err := bs.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusCreated:
desc, err := bs.statter.Stat(ctx, opts.Mount.From.Digest())
if err != nil {
return nil, err
}
return nil, distribution.ErrBlobMounted{From: opts.Mount.From, Descriptor: desc}
case http.StatusAccepted:
// TODO(dmcgowan): Check for invalid UUID
uuid := resp.Header.Get("Docker-Upload-UUID")
if uuid == "" {
// uuid is expected to be the last path element
_, uuid = path.Split(resp.Header.Get("Location"))
}
if uuid == "" {
return nil, errors.New("cannot retrieve docker upload UUID")
}
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
if err != nil {
return nil, err
}
return &httpBlobUpload{
ctx: ctx,
statter: bs.statter,
client: bs.client,
uuid: uuid,
startedAt: time.Now(),
location: location,
}, nil
default:
return nil, HandleHTTPResponseError(resp)
}
}
func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
location, err := bs.ub.BuildBlobUploadChunkURL(bs.name, id)
if err != nil {
return nil, err
}
return &httpBlobUpload{
ctx: ctx,
statter: bs.statter,
client: bs.client,
uuid: id,
startedAt: time.Now(),
location: location,
}, nil
}
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
return bs.statter.Clear(ctx, dgst)
}
type blobStatter struct {
name reference.Named
ub *v2.URLBuilder
client *http.Client
}
func (bs *blobStatter) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
ref, err := reference.WithDigest(bs.name, dgst)
if err != nil {
return distribution.Descriptor{}, err
}
u, err := bs.ub.BuildBlobURL(ref)
if err != nil {
return distribution.Descriptor{}, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodHead, u, nil)
if err != nil {
return distribution.Descriptor{}, err
}
resp, err := bs.client.Do(req)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
return distribution.Descriptor{}, distribution.ErrBlobUnknown
}
if err := HandleHTTPResponseError(resp); err != nil {
return distribution.Descriptor{}, err
}
lengthHeader := resp.Header.Get("Content-Length")
if lengthHeader == "" {
return distribution.Descriptor{}, fmt.Errorf("missing content-length header for request: %s", u)
}
length, err := strconv.ParseInt(lengthHeader, 10, 64)
if err != nil {
return distribution.Descriptor{}, fmt.Errorf("error parsing content-length: %v", err)
}
return distribution.Descriptor{
MediaType: resp.Header.Get("Content-Type"),
Size: length,
Digest: dgst,
}, nil
}
func buildCatalogValues(maxEntries int, last string) url.Values {
values := url.Values{}
if maxEntries > 0 {
values.Add("n", strconv.Itoa(maxEntries))
}
if last != "" {
values.Add("last", last)
}
return values
}
func (bs *blobStatter) Clear(ctx context.Context, dgst digest.Digest) error {
ref, err := reference.WithDigest(bs.name, dgst)
if err != nil {
return err
}
blobURL, err := bs.ub.BuildBlobURL(ref)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, blobURL, nil)
if err != nil {
return err
}
resp, err := bs.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return HandleHTTPResponseError(resp)
}
func (bs *blobStatter) SetDescriptor(ctx context.Context, dgst digest.Digest, desc distribution.Descriptor) error {
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,288 @@
package transport
import (
"compress/flate"
"compress/gzip"
"context"
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strconv"
"strings"
"unicode"
"github.com/klauspost/compress/zstd"
)
var (
contentRangeRegexp = regexp.MustCompile(`bytes ([0-9]+)-([0-9]+)/([0-9]+|\\*)`)
// ErrWrongCodeForByteRange is returned if the client sends a request
// with a Range header but the server returns a 2xx or 3xx code other
// than 206 Partial Content.
ErrWrongCodeForByteRange = errors.New("expected HTTP 206 from byte range request")
)
// ReadSeekCloser combines io.ReadSeeker with io.Closer.
//
// Deprecated: use [io.ReadSeekCloser].
type ReadSeekCloser = io.ReadSeekCloser
// NewHTTPReadSeeker handles reading from an HTTP endpoint using a GET
// request. When seeking and starting a read from a non-zero offset
// the a "Range" header will be added which sets the offset.
//
// TODO(dmcgowan): Move this into a separate utility package
func NewHTTPReadSeeker(ctx context.Context, client *http.Client, url string, errorHandler func(*http.Response) error) *HTTPReadSeeker {
return &HTTPReadSeeker{
ctx: ctx,
client: client,
url: url,
errorHandler: errorHandler,
}
}
// HTTPReadSeeker implements an [io.ReadSeekCloser].
type HTTPReadSeeker struct {
ctx context.Context
client *http.Client
url string
// errorHandler creates an error from an unsuccessful HTTP response.
// This allows the error to be created with the HTTP response body
// without leaking the body through a returned error.
errorHandler func(*http.Response) error
size int64
// rc is the remote read closer.
rc io.ReadCloser
// readerOffset tracks the offset as of the last read.
readerOffset int64
// seekOffset allows Seek to override the offset. Seek changes
// seekOffset instead of changing readOffset directly so that
// connection resets can be delayed and possibly avoided if the
// seek is undone (i.e. seeking to the end and then back to the
// beginning).
seekOffset int64
err error
}
func (hrs *HTTPReadSeeker) Read(p []byte) (n int, err error) {
if hrs.err != nil {
return 0, hrs.err
}
// If we sought to a different position, we need to reset the
// connection. This logic is here instead of Seek so that if
// a seek is undone before the next read, the connection doesn't
// need to be closed and reopened. A common example of this is
// seeking to the end to determine the length, and then seeking
// back to the original position.
if hrs.readerOffset != hrs.seekOffset {
hrs.reset()
}
hrs.readerOffset = hrs.seekOffset
rd, err := hrs.reader()
if err != nil {
return 0, err
}
n, err = rd.Read(p)
hrs.seekOffset += int64(n)
hrs.readerOffset += int64(n)
return n, err
}
func (hrs *HTTPReadSeeker) Seek(offset int64, whence int) (int64, error) {
if hrs.err != nil {
return 0, hrs.err
}
lastReaderOffset := hrs.readerOffset
if whence == io.SeekStart && hrs.rc == nil {
// If no request has been made yet, and we are seeking to an
// absolute position, set the read offset as well to avoid an
// unnecessary request.
hrs.readerOffset = offset
}
_, err := hrs.reader()
if err != nil {
hrs.readerOffset = lastReaderOffset
return 0, err
}
newOffset := hrs.seekOffset
switch whence {
case io.SeekCurrent:
newOffset += offset
case io.SeekEnd:
if hrs.size < 0 {
return 0, errors.New("content length not known")
}
newOffset = hrs.size + offset
case io.SeekStart:
newOffset = offset
}
if newOffset < 0 {
err = errors.New("cannot seek to negative position")
} else {
hrs.seekOffset = newOffset
}
return hrs.seekOffset, err
}
func (hrs *HTTPReadSeeker) Close() error {
if hrs.err != nil {
return hrs.err
}
// close and release reader chain
if hrs.rc != nil {
hrs.rc.Close()
}
hrs.rc = nil
hrs.err = errors.New("httpLayer: closed")
return nil
}
func (hrs *HTTPReadSeeker) reset() {
if hrs.err != nil {
return
}
if hrs.rc != nil {
hrs.rc.Close()
hrs.rc = nil
}
}
func (hrs *HTTPReadSeeker) reader() (io.Reader, error) {
if hrs.err != nil {
return nil, hrs.err
}
if hrs.rc != nil {
return hrs.rc, nil
}
req, err := http.NewRequestWithContext(hrs.ctx, http.MethodGet, hrs.url, nil)
if err != nil {
return nil, err
}
if hrs.readerOffset > 0 {
// If we are at different offset, issue a range request from there.
req.Header.Add("Range", fmt.Sprintf("bytes=%d-", hrs.readerOffset))
// TODO: get context in here
// context.GetLogger(hrs.context).Infof("Range: %s", req.Header.Get("Range"))
}
req.Header.Add("Accept-Encoding", "zstd, gzip, deflate")
resp, err := hrs.client.Do(req)
if err != nil {
return nil, err
}
// Normally would use client.SuccessStatus, but that would be a cyclic
// import
if resp.StatusCode >= 200 && resp.StatusCode <= 399 {
if hrs.readerOffset > 0 {
if resp.StatusCode != http.StatusPartialContent {
return nil, ErrWrongCodeForByteRange
}
contentRange := resp.Header.Get("Content-Range")
if contentRange == "" {
return nil, errors.New("no Content-Range header found in HTTP 206 response")
}
submatches := contentRangeRegexp.FindStringSubmatch(contentRange)
if len(submatches) < 4 {
return nil, fmt.Errorf("could not parse Content-Range header: %s", contentRange)
}
startByte, err := strconv.ParseUint(submatches[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("could not parse start of range in Content-Range header: %s", contentRange)
}
if startByte != uint64(hrs.readerOffset) {
return nil, fmt.Errorf("received Content-Range starting at offset %d instead of requested %d", startByte, hrs.readerOffset)
}
endByte, err := strconv.ParseUint(submatches[2], 10, 64)
if err != nil {
return nil, fmt.Errorf("could not parse end of range in Content-Range header: %s", contentRange)
}
if submatches[3] == "*" {
hrs.size = -1
} else {
size, err := strconv.ParseUint(submatches[3], 10, 64)
if err != nil {
return nil, fmt.Errorf("could not parse total size in Content-Range header: %s", contentRange)
}
if endByte+1 != size {
return nil, fmt.Errorf("range in Content-Range stops before the end of the content: %s", contentRange)
}
hrs.size = int64(size)
}
} else if resp.StatusCode == http.StatusOK {
hrs.size = resp.ContentLength
} else {
hrs.size = -1
}
body := resp.Body
encoding := strings.FieldsFunc(resp.Header.Get("Content-Encoding"), func(r rune) bool {
return unicode.IsSpace(r) || r == ','
})
for i := len(encoding) - 1; i >= 0; i-- {
algorithm := strings.ToLower(encoding[i])
switch algorithm {
case "zstd":
r, err := zstd.NewReader(body)
if err != nil {
return nil, err
}
body = r.IOReadCloser()
case "gzip":
body, err = gzip.NewReader(body)
if err != nil {
return nil, err
}
case "deflate":
body = flate.NewReader(body)
case "":
// no content-encoding applied, use raw body
default:
return nil, errors.New("unsupported Content-Encoding algorithm: " + algorithm)
}
}
hrs.rc = body
} else {
defer resp.Body.Close()
if hrs.errorHandler != nil {
return nil, hrs.errorHandler(resp)
}
return nil, fmt.Errorf("unexpected status resolving reader: %v", resp.Status)
}
return hrs.rc, nil
}

View File

@@ -0,0 +1,148 @@
package transport
import (
"bytes"
"compress/flate"
"compress/gzip"
"context"
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/klauspost/compress/zstd"
)
func TestContentEncoding(t *testing.T) {
t.Parallel()
zstdDecode := func(in []byte) []byte {
var b bytes.Buffer
zw, err := zstd.NewWriter(&b)
if err != nil {
t.Fatal(err)
}
_, err = zw.Write(in)
if err != nil {
t.Fatal()
}
err = zw.Close()
if err != nil {
t.Fatal(err)
}
return b.Bytes()
}
gzipEncode := func(in []byte) []byte {
var b bytes.Buffer
gw := gzip.NewWriter(&b)
_, err := gw.Write(in)
if err != nil {
t.Fatal(err)
}
err = gw.Close()
if err != nil {
t.Fatal(err)
}
return b.Bytes()
}
flateEncode := func(in []byte) []byte {
var b bytes.Buffer
dw, err := flate.NewWriter(&b, -1)
if err != nil {
t.Fatal(err)
}
_, err = dw.Write(in)
if err != nil {
t.Fatal(err)
}
err = dw.Close()
if err != nil {
t.Fatal(err)
}
return b.Bytes()
}
tests := []struct {
encodingFuncs []func([]byte) []byte
encodingHeader string
}{
{
encodingFuncs: []func([]byte) []byte{},
encodingHeader: "",
},
{
encodingFuncs: []func([]byte) []byte{zstdDecode},
encodingHeader: "zstd",
},
{
encodingFuncs: []func([]byte) []byte{gzipEncode},
encodingHeader: "gzip",
},
{
encodingFuncs: []func([]byte) []byte{flateEncode},
encodingHeader: "deflate",
},
{
encodingFuncs: []func([]byte) []byte{zstdDecode, gzipEncode},
encodingHeader: "zstd,gzip",
},
{
encodingFuncs: []func([]byte) []byte{gzipEncode, flateEncode},
encodingHeader: "gzip,deflate",
},
{
encodingFuncs: []func([]byte) []byte{gzipEncode, zstdDecode},
encodingHeader: "gzip,zstd",
},
{
encodingFuncs: []func([]byte) []byte{gzipEncode, zstdDecode, flateEncode},
encodingHeader: "gzip,zstd,deflate",
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.encodingHeader, func(t *testing.T) {
t.Parallel()
content := make([]byte, 128)
rand.New(rand.NewSource(1)).Read(content)
s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
compressedContent := content
for _, enc := range tc.encodingFuncs {
compressedContent = enc(compressedContent)
}
rw.Header().Set("content-length", fmt.Sprintf("%d", len(compressedContent)))
rw.Header().Set("Content-Encoding", tc.encodingHeader)
_, _ = rw.Write(compressedContent)
}))
defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}
rs := NewHTTPReadSeeker(context.TODO(), http.DefaultClient, u.String(), func(r *http.Response) error { return nil })
b, err := io.ReadAll(rs)
if err != nil {
t.Fatal(err)
}
expected := content
if len(b) != len(expected) {
t.Errorf("unexpected length %d, expected %d", len(b), len(expected))
return
}
for i, c := range expected {
if b[i] != c {
t.Errorf("unexpected byte %x at %d, expected %x", b[i], i, c)
return
}
}
})
}
}

View File

@@ -0,0 +1,155 @@
package transport
import (
"io"
"net/http"
"sync"
)
func identityTransportWrapper(rt http.RoundTripper) http.RoundTripper {
return rt
}
// DefaultTransportWrapper allows a user to wrap every generated transport
var DefaultTransportWrapper = identityTransportWrapper
// RequestModifier represents an object which will do an inplace
// modification of an HTTP request.
type RequestModifier interface {
ModifyRequest(*http.Request) error
}
type headerModifier http.Header
// NewHeaderRequestModifier returns a new RequestModifier which will
// add the given headers to a request.
func NewHeaderRequestModifier(header http.Header) RequestModifier {
return headerModifier(header)
}
func (h headerModifier) ModifyRequest(req *http.Request) error {
for k, s := range http.Header(h) {
req.Header[k] = append(req.Header[k], s...)
}
return nil
}
// NewTransport creates a new transport which will apply modifiers to
// the request on a RoundTrip call.
func NewTransport(base http.RoundTripper, modifiers ...RequestModifier) http.RoundTripper {
return DefaultTransportWrapper(
&transport{
Modifiers: modifiers,
Base: base,
})
}
// transport is an http.RoundTripper that makes HTTP requests after
// copying and modifying the request
type transport struct {
Modifiers []RequestModifier
Base http.RoundTripper
mu sync.Mutex // guards modReq
modReq map[*http.Request]*http.Request // original -> modified
}
// RoundTrip authorizes and authenticates the request with an
// access token. If no token exists or token is expired,
// tries to refresh/fetch a new token.
func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
req2 := cloneRequest(req)
for _, modifier := range t.Modifiers {
if err := modifier.ModifyRequest(req2); err != nil {
return nil, err
}
}
t.setModReq(req, req2)
res, err := t.base().RoundTrip(req2)
if err != nil {
t.setModReq(req, nil)
return nil, err
}
res.Body = &onEOFReader{
rc: res.Body,
fn: func() { t.setModReq(req, nil) },
}
return res, nil
}
// CancelRequest cancels an in-flight request by closing its connection.
func (t *transport) CancelRequest(req *http.Request) {
type canceler interface {
CancelRequest(*http.Request)
}
if cr, ok := t.base().(canceler); ok {
t.mu.Lock()
modReq := t.modReq[req]
delete(t.modReq, req)
t.mu.Unlock()
cr.CancelRequest(modReq)
}
}
func (t *transport) base() http.RoundTripper {
if t.Base != nil {
return t.Base
}
return http.DefaultTransport
}
func (t *transport) setModReq(orig, mod *http.Request) {
t.mu.Lock()
defer t.mu.Unlock()
if t.modReq == nil {
t.modReq = make(map[*http.Request]*http.Request)
}
if mod == nil {
delete(t.modReq, orig)
} else {
t.modReq[orig] = mod
}
}
// cloneRequest returns a clone of the provided *http.Request.
// The clone is a shallow copy of the struct and its Header map.
func cloneRequest(r *http.Request) *http.Request {
// shallow copy of the struct
r2 := new(http.Request)
*r2 = *r
// deep copy of the Header
r2.Header = make(http.Header, len(r.Header))
for k, s := range r.Header {
r2.Header[k] = append([]string(nil), s...)
}
return r2
}
type onEOFReader struct {
rc io.ReadCloser
fn func()
}
func (r *onEOFReader) Read(p []byte) (n int, err error) {
n, err = r.rc.Read(p)
if err == io.EOF {
r.runFunc()
}
return
}
func (r *onEOFReader) Close() error {
err := r.rc.Close()
r.runFunc()
return err
}
func (r *onEOFReader) runFunc() {
if fn := r.fn; fn != nil {
fn()
r.fn = nil
}
}