Drop AWS kubelet credential provider and cleanup AWS storage e2e tests

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas 2023-03-07 09:00:12 -05:00
parent 1af56548af
commit 90d185b7e1
No known key found for this signature in database
GPG Key ID: 80D83A796103BF59
7 changed files with 0 additions and 1193 deletions

View File

@ -21,7 +21,6 @@ package app
import ( import (
// Credential providers // Credential providers
_ "k8s.io/kubernetes/pkg/credentialprovider/aws"
_ "k8s.io/kubernetes/pkg/credentialprovider/azure" _ "k8s.io/kubernetes/pkg/credentialprovider/azure"
_ "k8s.io/kubernetes/pkg/credentialprovider/gcp" _ "k8s.io/kubernetes/pkg/credentialprovider/gcp"

View File

@ -1,4 +0,0 @@
# See the OWNERS docs at https://go.k8s.io/owners
reviewers:
- justinsb

View File

@ -1,389 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package credentials
import (
"encoding/base64"
"errors"
"fmt"
"io/ioutil"
"net/url"
"os"
"regexp"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ecr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/component-base/version"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/credentialprovider"
)
var (
ecrPattern = regexp.MustCompile(`^(\d{12})\.dkr\.ecr(\-fips)?\.([a-zA-Z0-9][a-zA-Z0-9-_]*)\.(amazonaws\.com(\.cn)?|sc2s\.sgov\.gov|c2s\.ic\.gov)$`)
once sync.Once
isEC2 bool
)
// init registers a credential provider for each registryURLTemplate and creates
// an ECR token getter factory with a new cache to store token getters
func init() {
credentialprovider.RegisterCredentialProvider("amazon-ecr",
newECRProvider(&ecrTokenGetterFactory{cache: make(map[string]tokenGetter)},
ec2ValidationImpl,
))
}
// ecrProvider is a DockerConfigProvider that gets and refreshes tokens
// from AWS to access ECR.
type ecrProvider struct {
cache cache.Store
getterFactory tokenGetterFactory
isEC2 ec2ValidationFunc
}
var _ credentialprovider.DockerConfigProvider = &ecrProvider{}
func newECRProvider(getterFactory tokenGetterFactory, isEC2 ec2ValidationFunc) *ecrProvider {
return &ecrProvider{
cache: cache.NewExpirationStore(stringKeyFunc, &ecrExpirationPolicy{}),
getterFactory: getterFactory,
isEC2: isEC2,
}
}
// Enabled implements DockerConfigProvider.Enabled.
func (p *ecrProvider) Enabled() bool {
return true
}
type ec2ValidationFunc func() bool
// ec2ValidationImpl returns true if we detect
// an EC2 vm based on checking for the EC2 system UUID, the asset tag (for nitro
// instances), or instance credentials if the UUID is not present.
// Ref: https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html
func ec2ValidationImpl() bool {
return tryValidateEC2UUID() || tryValidateEC2Creds()
}
func tryValidateEC2UUID() bool {
hypervisor_uuid := "/sys/hypervisor/uuid"
product_uuid := "/sys/devices/virtual/dmi/id/product_uuid"
asset_tag := "/sys/devices/virtual/dmi/id/board_asset_tag"
if _, err := os.Stat(hypervisor_uuid); err == nil {
b, err := ioutil.ReadFile(hypervisor_uuid)
if err != nil {
klog.Errorf("error checking if this is an EC2 instance: %v", err)
} else if strings.HasPrefix(string(b), "EC2") || strings.HasPrefix(string(b), "ec2") {
klog.V(5).Infof("found 'ec2' in uuid %v from %v, enabling legacy AWS credential provider", string(b), hypervisor_uuid)
return true
}
}
if _, err := os.Stat(product_uuid); err == nil {
b, err := ioutil.ReadFile(product_uuid)
if err != nil {
klog.Errorf("error checking if this is an EC2 instance: %v", err)
} else if strings.HasPrefix(string(b), "EC2") || strings.HasPrefix(string(b), "ec2") {
klog.V(5).Infof("found 'ec2' in uuid %v from %v, enabling legacy AWS credential provider", string(b), product_uuid)
return true
}
}
if _, err := os.Stat(asset_tag); err == nil {
b, err := ioutil.ReadFile(asset_tag)
s := strings.TrimSpace(string(b))
if err != nil {
klog.Errorf("error checking if this is an EC2 instance: %v", err)
} else if strings.HasPrefix(s, "i-") && len(s) == 19 {
// Instance ID's are 19 characters plus newline
klog.V(5).Infof("found instance ID in %v from %v, enabling legacy AWS credential provider", string(b), asset_tag)
return true
}
}
return false
}
func tryValidateEC2Creds() bool {
sess, err := session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
klog.Errorf("while validating AWS credentials %v", err)
return false
}
if _, err := sess.Config.Credentials.Get(); err != nil {
klog.Errorf("while getting AWS credentials %v", err)
return false
}
klog.V(5).Infof("found aws credentials, enabling legacy AWS credential provider")
return true
}
// Provide returns a DockerConfig with credentials from the cache if they are
// found, or from ECR
func (p *ecrProvider) Provide(image string) credentialprovider.DockerConfig {
parsed, err := parseRepoURL(image)
if err != nil {
return credentialprovider.DockerConfig{}
}
// To prevent the AWS SDK from causing latency on non-aws platforms, only test if we are on
// EC2 or have access to credentials once. Attempt to do it without network calls by checking
// for certain EC2-specific files. Otherwise, we ask the SDK to initialize a session to see if
// credentials are available. On non-aws platforms, especially when a metadata endpoint is blocked,
// this has been shown to cause 20 seconds of latency due to SDK retries
// (see https://github.com/kubernetes/kubernetes/issues/92162)
once.Do(func() {
isEC2 = p.isEC2()
if isEC2 && credentialprovider.AreLegacyCloudCredentialProvidersDisabled() {
klog.V(4).Infof("AWS credential provider is now disabled. Please refer to sig-cloud-provider for guidance on external credential provider integration for AWS")
}
})
if !isEC2 {
return credentialprovider.DockerConfig{}
}
if credentialprovider.AreLegacyCloudCredentialProvidersDisabled() {
return credentialprovider.DockerConfig{}
}
if cfg, exists := p.getFromCache(parsed); exists {
klog.V(3).Infof("Got ECR credentials from cache for %s", parsed.registry)
return cfg
}
klog.V(3).Info("unable to get ECR credentials from cache, checking ECR API")
cfg, err := p.getFromECR(parsed)
if err != nil {
klog.Errorf("error getting credentials from ECR for %s %v", parsed.registry, err)
return credentialprovider.DockerConfig{}
}
klog.V(3).Infof("Got ECR credentials from ECR API for %s", parsed.registry)
return cfg
}
// getFromCache attempts to get credentials from the cache
func (p *ecrProvider) getFromCache(parsed *parsedURL) (credentialprovider.DockerConfig, bool) {
cfg := credentialprovider.DockerConfig{}
obj, exists, err := p.cache.GetByKey(parsed.registry)
if err != nil {
klog.Errorf("error getting ECR credentials from cache: %v", err)
return cfg, false
}
if !exists {
return cfg, false
}
entry := obj.(*cacheEntry)
cfg[entry.registry] = entry.credentials
return cfg, true
}
// getFromECR gets credentials from ECR since they are not in the cache
func (p *ecrProvider) getFromECR(parsed *parsedURL) (credentialprovider.DockerConfig, error) {
cfg := credentialprovider.DockerConfig{}
getter, err := p.getterFactory.GetTokenGetterForRegion(parsed.region)
if err != nil {
return cfg, err
}
params := &ecr.GetAuthorizationTokenInput{RegistryIds: []*string{aws.String(parsed.registryID)}}
output, err := getter.GetAuthorizationToken(params)
if err != nil {
return cfg, err
}
if output == nil {
return cfg, errors.New("authorization token is nil")
}
if len(output.AuthorizationData) == 0 {
return cfg, errors.New("authorization data from response is empty")
}
data := output.AuthorizationData[0]
if data.AuthorizationToken == nil {
return cfg, errors.New("authorization token in response is nil")
}
entry, err := makeCacheEntry(data, parsed.registry)
if err != nil {
return cfg, err
}
if err := p.cache.Add(entry); err != nil {
return cfg, err
}
cfg[entry.registry] = entry.credentials
return cfg, nil
}
type parsedURL struct {
registryID string
region string
registry string
}
// parseRepoURL parses and splits the registry URL into the registry ID,
// region, and registry.
// <registryID>.dkr.ecr(-fips).<region>.amazonaws.com(.cn)
func parseRepoURL(image string) (*parsedURL, error) {
parsed, err := url.Parse("https://" + image)
if err != nil {
return nil, fmt.Errorf("error parsing image %s %v", image, err)
}
splitURL := ecrPattern.FindStringSubmatch(parsed.Hostname())
if len(splitURL) == 0 {
return nil, fmt.Errorf("%s is not a valid ECR repository URL", parsed.Hostname())
}
return &parsedURL{
registryID: splitURL[1],
region: splitURL[3],
registry: parsed.Hostname(),
}, nil
}
// tokenGetter is for testing purposes
type tokenGetter interface {
GetAuthorizationToken(input *ecr.GetAuthorizationTokenInput) (*ecr.GetAuthorizationTokenOutput, error)
}
// tokenGetterFactory is for testing purposes
type tokenGetterFactory interface {
GetTokenGetterForRegion(string) (tokenGetter, error)
}
// ecrTokenGetterFactory stores a token getter per region
type ecrTokenGetterFactory struct {
cache map[string]tokenGetter
mutex sync.Mutex
}
// awsHandlerLogger is a handler that logs all AWS SDK requests
// Copied from pkg/cloudprovider/providers/aws/log_handler.go
func awsHandlerLogger(req *request.Request) {
service := req.ClientInfo.ServiceName
region := req.Config.Region
name := "?"
if req.Operation != nil {
name = req.Operation.Name
}
klog.V(3).Infof("AWS request: %s:%s in %s", service, name, *region)
}
func newECRTokenGetter(region string) (tokenGetter, error) {
sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{Region: aws.String(region)},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
return nil, err
}
getter := &ecrTokenGetter{svc: ecr.New(sess)}
getter.svc.Handlers.Build.PushFrontNamed(request.NamedHandler{
Name: "k8s/user-agent",
Fn: request.MakeAddToUserAgentHandler("kubernetes", version.Get().String()),
})
getter.svc.Handlers.Sign.PushFrontNamed(request.NamedHandler{
Name: "k8s/logger",
Fn: awsHandlerLogger,
})
return getter, nil
}
// GetTokenGetterForRegion gets the token getter for the requested region. If it
// doesn't exist, it creates a new ECR token getter
func (f *ecrTokenGetterFactory) GetTokenGetterForRegion(region string) (tokenGetter, error) {
f.mutex.Lock()
defer f.mutex.Unlock()
if getter, ok := f.cache[region]; ok {
return getter, nil
}
getter, err := newECRTokenGetter(region)
if err != nil {
return nil, fmt.Errorf("unable to create token getter for region %v %v", region, err)
}
f.cache[region] = getter
return getter, nil
}
// The canonical implementation
type ecrTokenGetter struct {
svc *ecr.ECR
}
// GetAuthorizationToken gets the ECR authorization token using the ECR API
func (p *ecrTokenGetter) GetAuthorizationToken(input *ecr.GetAuthorizationTokenInput) (*ecr.GetAuthorizationTokenOutput, error) {
return p.svc.GetAuthorizationToken(input)
}
type cacheEntry struct {
expiresAt time.Time
credentials credentialprovider.DockerConfigEntry
registry string
}
// makeCacheEntry decodes the ECR authorization entry and re-packages it into a
// cacheEntry.
func makeCacheEntry(data *ecr.AuthorizationData, registry string) (*cacheEntry, error) {
decodedToken, err := base64.StdEncoding.DecodeString(aws.StringValue(data.AuthorizationToken))
if err != nil {
return nil, fmt.Errorf("error decoding ECR authorization token: %v", err)
}
parts := strings.SplitN(string(decodedToken), ":", 2)
if len(parts) < 2 {
return nil, errors.New("error getting username and password from authorization token")
}
creds := credentialprovider.DockerConfigEntry{
Username: parts[0],
Password: parts[1],
Email: "not@val.id", // ECR doesn't care and Docker is about to obsolete it
}
if data.ExpiresAt == nil {
return nil, errors.New("authorization data expiresAt is nil")
}
return &cacheEntry{
expiresAt: data.ExpiresAt.Add(-1 * wait.Jitter(30*time.Minute, 0.2)),
credentials: creds,
registry: registry,
}, nil
}
// ecrExpirationPolicy implements ExpirationPolicy from client-go.
type ecrExpirationPolicy struct{}
// stringKeyFunc returns the cache key as a string
func stringKeyFunc(obj interface{}) (string, error) {
key := obj.(*cacheEntry).registry
return key, nil
}
// IsExpired checks if the ECR credentials are expired.
func (p *ecrExpirationPolicy) IsExpired(entry *cache.TimestampedEntry) bool {
return time.Now().After(entry.Obj.(*cacheEntry).expiresAt)
}

View File

@ -1,348 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package credentials
import (
"encoding/base64"
"fmt"
"math/rand"
"path"
"strconv"
"testing"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ecr"
"k8s.io/kubernetes/pkg/credentialprovider"
)
const user = "foo"
const password = "1234567890abcdef" // Fake value for testing.
const email = "not@val.id"
// Mock implementation
// randomizePassword is used to check for a cache hit to verify the password
// has not changed
type testTokenGetter struct {
user string
password string
endpoint string
randomizePassword bool
}
type testTokenGetterFactory struct {
getter tokenGetter
}
func (f *testTokenGetterFactory) GetTokenGetterForRegion(region string) (tokenGetter, error) {
return f.getter, nil
}
func (p *testTokenGetter) GetAuthorizationToken(input *ecr.GetAuthorizationTokenInput) (*ecr.GetAuthorizationTokenOutput, error) {
if p.randomizePassword {
rand.Seed(int64(time.Now().Nanosecond()))
p.password = strconv.Itoa(rand.Int())
}
expiration := time.Now().Add(1 * time.Hour)
// expiration := time.Now().Add(5 * time.Second) //for testing with the cache expiring
creds := []byte(fmt.Sprintf("%s:%s", p.user, p.password))
data := &ecr.AuthorizationData{
AuthorizationToken: aws.String(base64.StdEncoding.EncodeToString(creds)),
ExpiresAt: &expiration,
ProxyEndpoint: aws.String(p.endpoint),
}
output := &ecr.GetAuthorizationTokenOutput{
AuthorizationData: []*ecr.AuthorizationData{data},
}
return output, nil //p.svc.GetAuthorizationToken(input)
}
func TestRegistryPatternMatch(t *testing.T) {
grid := []struct {
Registry string
Expected bool
}{
{"123456789012.dkr.ecr.lala-land-1.amazonaws.com", true},
// fips
{"123456789012.dkr.ecr-fips.lala-land-1.amazonaws.com", true},
// .cn
{"123456789012.dkr.ecr.lala-land-1.amazonaws.com.cn", true},
// iso
{"123456789012.dkr.ecr.us-iso-east-1.c2s.ic.gov", true},
// iso-b
{"123456789012.dkr.ecr.us-isob-east-1.sc2s.sgov.gov", true},
// invalid gov endpoint
{"123456789012.dkr.ecr.us-iso-east-1.amazonaws.gov", false},
// registry ID too long
{"1234567890123.dkr.ecr.lala-land-1.amazonaws.com", false},
// registry ID too short
{"12345678901.dkr.ecr.lala-land-1.amazonaws.com", false},
// registry ID has invalid chars
{"12345678901A.dkr.ecr.lala-land-1.amazonaws.com", false},
// region has invalid chars
{"123456789012.dkr.ecr.lala-land-1!.amazonaws.com", false},
// region starts with invalid char
{"123456789012.dkr.ecr.#lala-land-1.amazonaws.com", false},
// invalid host suffix
{"123456789012.dkr.ecr.lala-land-1.amazonaws.hacker.com", false},
// invalid host suffix
{"123456789012.dkr.ecr.lala-land-1.hacker.com", false},
// invalid host suffix
{"123456789012.dkr.ecr.lala-land-1.amazonaws.lol", false},
// without dkr
{"123456789012.dog.ecr.lala-land-1.amazonaws.com", false},
// without ecr
{"123456789012.dkr.cat.lala-land-1.amazonaws.com", false},
// without amazonaws
{"123456789012.dkr.cat.lala-land-1.awsamazon.com", false},
// too short
{"123456789012.lala-land-1.amazonaws.com", false},
}
for _, g := range grid {
actual := ecrPattern.MatchString(g.Registry)
if actual != g.Expected {
t.Errorf("unexpected pattern match value, want %v for %s", g.Expected, g.Registry)
}
}
}
func TestParseRepoURLPass(t *testing.T) {
registryID := "123456789012"
region := "lala-land-1"
port := "9001"
registry := "123456789012.dkr.ecr.lala-land-1.amazonaws.com"
image := path.Join(registry, port, "foo/bar")
parsedURL, err := parseRepoURL(image)
if err != nil {
t.Errorf("Could not parse URL: %s, err: %v", image, err)
}
if registryID != parsedURL.registryID {
t.Errorf("Unexpected registryID value, want: %s, got: %s", registryID, parsedURL.registryID)
}
if region != parsedURL.region {
t.Errorf("Unexpected region value, want: %s, got: %s", region, parsedURL.region)
}
if registry != parsedURL.registry {
t.Errorf("Unexpected registry value, want: %s, got: %s", registry, parsedURL.registry)
}
}
func TestParseRepoURLFail(t *testing.T) {
registry := "123456789012.foo.bar.baz"
image := path.Join(registry, "foo/bar")
parsedURL, err := parseRepoURL(image)
expectedErr := "123456789012.foo.bar.baz is not a valid ECR repository URL"
if err == nil {
t.Errorf("Should fail to parse URL %s", image)
}
if err.Error() != expectedErr {
t.Errorf("Unexpected error, want: %s, got: %v", expectedErr, err)
}
if parsedURL != nil {
t.Errorf("Expected parsedURL to be nil")
}
}
func isAlwaysEC2() bool {
return true
}
func TestECRProvide(t *testing.T) {
registry := "123456789012.dkr.ecr.lala-land-1.amazonaws.com"
otherRegistries := []string{
"123456789012.dkr.ecr.cn-foo-1.amazonaws.com.cn",
"private.registry.com",
"gcr.io",
}
image := path.Join(registry, "foo/bar")
p := newECRProvider(&testTokenGetterFactory{
getter: &testTokenGetter{
user: user,
password: password,
endpoint: registry,
},
}, isAlwaysEC2)
keyring := &credentialprovider.BasicDockerKeyring{}
keyring.Add(p.Provide(image))
// Verify that we get the expected username/password combo for
// an ECR image name.
creds, ok := keyring.Lookup(image)
if !ok {
t.Errorf("Didn't find expected URL: %s", image)
return
}
if len(creds) > 1 {
t.Errorf("Got more hits than expected: %s", creds)
}
cred := creds[0]
if user != cred.Username {
t.Errorf("Unexpected username value, want: %s, got: %s", user, cred.Username)
}
if password != creds[0].Password {
t.Errorf("Unexpected password value, want: %s, got: %s", password, cred.Password)
}
if email != creds[0].Email {
t.Errorf("Unexpected email value, want: %s, got: %s", email, cred.Email)
}
// Verify that we get an error for other images.
for _, otherRegistry := range otherRegistries {
image = path.Join(otherRegistry, "foo/bar")
_, ok = keyring.Lookup(image)
if ok {
t.Errorf("Unexpectedly found image: %s", image)
return
}
}
}
func TestECRProvideCached(t *testing.T) {
registry := "123456789012.dkr.ecr.lala-land-1.amazonaws.com"
p := newECRProvider(&testTokenGetterFactory{
getter: &testTokenGetter{
user: user,
password: password,
endpoint: registry,
randomizePassword: true,
},
}, isAlwaysEC2)
image1 := path.Join(registry, "foo/bar")
image2 := path.Join(registry, "bar/baz")
keyring := &credentialprovider.BasicDockerKeyring{}
keyring.Add(p.Provide(image1))
// time.Sleep(6 * time.Second) //for testing with the cache expiring
keyring.Add(p.Provide(image2))
// Verify that we get the credentials from the
// cache the second time
creds1, ok := keyring.Lookup(image1)
if !ok {
t.Errorf("Didn't find expected URL: %s", image1)
return
}
if len(creds1) != 2 {
t.Errorf("Got more hits than expected: %s", creds1)
}
if creds1[0].Password != creds1[1].Password {
t.Errorf("cached credentials do not match")
}
creds2, ok := keyring.Lookup(image2)
if !ok {
t.Errorf("Didn't find expected URL: %s", image1)
return
}
if len(creds2) != 2 {
t.Errorf("Got more hits than expected: %s", creds2)
}
if creds2[0].Password != creds2[1].Password {
t.Errorf("cached credentials do not match")
}
if creds1[0].Password != creds2[0].Password {
t.Errorf("cached credentials do not match")
}
}
func TestChinaECRProvide(t *testing.T) {
registry := "123456789012.dkr.ecr.cn-foo-1.amazonaws.com.cn"
otherRegistries := []string{
"123456789012.dkr.ecr.lala-land-1.amazonaws.com",
"private.registry.com",
"gcr.io",
}
image := path.Join(registry, "foo/bar")
p := newECRProvider(&testTokenGetterFactory{
getter: &testTokenGetter{
user: user,
password: password,
endpoint: registry,
},
}, isAlwaysEC2)
keyring := &credentialprovider.BasicDockerKeyring{}
keyring.Add(p.Provide(image))
// Verify that we get the expected username/password combo for
// an ECR image name.
creds, ok := keyring.Lookup(image)
if !ok {
t.Errorf("Didn't find expected URL: %s", image)
return
}
if len(creds) > 1 {
t.Errorf("Got more hits than expected: %s", creds)
}
cred := creds[0]
if user != cred.Username {
t.Errorf("Unexpected username value, want: %s, got: %s", user, cred.Username)
}
if password != cred.Password {
t.Errorf("Unexpected password value, want: %s, got: %s", password, cred.Password)
}
if email != cred.Email {
t.Errorf("Unexpected email value, want: %s, got: %s", email, cred.Email)
}
// Verify that we get an error for other images.
for _, otherRegistry := range otherRegistries {
image = path.Join(otherRegistry, image)
_, ok = keyring.Lookup(image)
if ok {
t.Errorf("Unexpectedly found image: %s", image)
return
}
}
}
func TestChinaECRProvideCached(t *testing.T) {
registry := "123456789012.dkr.ecr.cn-foo-1.amazonaws.com.cn"
p := newECRProvider(&testTokenGetterFactory{
getter: &testTokenGetter{
user: user,
password: password,
endpoint: registry,
randomizePassword: true,
},
}, isAlwaysEC2)
image := path.Join(registry, "foo/bar")
keyring := &credentialprovider.BasicDockerKeyring{}
keyring.Add(p.Provide(image))
// time.Sleep(6 * time.Second) //for testing with the cache expiring
keyring.Add(p.Provide(image))
// Verify that we get the credentials from the
// cache the second time
creds, ok := keyring.Lookup(image)
if !ok {
t.Errorf("Didn't find expected URL: %s", image)
return
}
if len(creds) != 2 {
t.Errorf("Got more hits than expected: %s", creds)
}
if creds[0].Password != creds[1].Password {
t.Errorf("cached credentials do not match")
}
}
func BenchmarkSetupLatency(b *testing.B) {
p := newECRProvider(&ecrTokenGetterFactory{cache: make(map[string]tokenGetter)}, ec2ValidationImpl)
_ = p.Enabled()
}

View File

@ -25,9 +25,6 @@ import (
"google.golang.org/api/googleapi" "google.golang.org/api/googleapi"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1" policyv1 "k8s.io/api/policy/v1"
@ -523,23 +520,6 @@ func detachPD(nodeName types.NodeName, pdName string) error {
} }
return err return err
} else if framework.TestContext.Provider == "aws" {
awsSession, err := session.NewSession()
if err != nil {
return fmt.Errorf("error creating session: %w", err)
}
client := ec2.New(awsSession)
tokens := strings.Split(pdName, "/")
awsVolumeID := tokens[len(tokens)-1]
request := ec2.DetachVolumeInput{
VolumeId: aws.String(awsVolumeID),
}
_, err = client.DetachVolume(&request)
if err != nil {
return fmt.Errorf("error detaching EBS volume: %w", err)
}
return nil
} else { } else {
return fmt.Errorf("Provider does not support volume detaching") return fmt.Errorf("Provider does not support volume detaching")
} }
@ -558,20 +538,6 @@ func attachPD(nodeName types.NodeName, pdName string) error {
} }
return err return err
} else if framework.TestContext.Provider == "aws" {
awsSession, err := session.NewSession()
if err != nil {
return fmt.Errorf("error creating session: %w", err)
}
client := ec2.New(awsSession)
tokens := strings.Split(pdName, "/")
awsVolumeID := tokens[len(tokens)-1]
ebsUtil := utils.NewEBSUtil(client)
err = ebsUtil.AttachDisk(awsVolumeID, string(nodeName))
if err != nil {
return fmt.Errorf("error attaching volume %s to node %s: %w", awsVolumeID, nodeName, err)
}
return nil
} else { } else {
return fmt.Errorf("Provider does not support volume attaching") return fmt.Errorf("Provider does not support volume attaching")
} }

View File

@ -1,263 +0,0 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
"fmt"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
)
const (
volumeAttachmentStatusPollDelay = 2 * time.Second
volumeAttachmentStatusFactor = 2
volumeAttachmentStatusSteps = 6
// represents expected attachment status of a volume after attach
volumeAttachedStatus = "attached"
// represents expected attachment status of a volume after detach
volumeDetachedStatus = "detached"
)
// EBSUtil provides functions to interact with EBS volumes
type EBSUtil struct {
client *ec2.EC2
validDevices []string
}
// NewEBSUtil returns an instance of EBSUtil which can be used to
// to interact with EBS volumes
func NewEBSUtil(client *ec2.EC2) *EBSUtil {
ebsUtil := &EBSUtil{client: client}
validDevices := []string{}
for _, firstChar := range []rune{'b', 'c'} {
for i := 'a'; i <= 'z'; i++ {
dev := string([]rune{firstChar, i})
validDevices = append(validDevices, dev)
}
}
ebsUtil.validDevices = validDevices
return ebsUtil
}
// AttachDisk attaches an EBS volume to a node.
func (ebs *EBSUtil) AttachDisk(volumeID string, nodeName string) error {
instance, err := findInstanceByNodeName(nodeName, ebs.client)
if err != nil {
return fmt.Errorf("error finding node %s: %w", nodeName, err)
}
err = ebs.waitForAvailable(volumeID)
if err != nil {
return fmt.Errorf("error waiting volume %s to be available: %w", volumeID, err)
}
device, err := ebs.findFreeDevice(instance)
if err != nil {
return fmt.Errorf("error finding free device on node %s: %w", nodeName, err)
}
hostDevice := "/dev/xvd" + string(device)
attachInput := &ec2.AttachVolumeInput{
VolumeId: &volumeID,
InstanceId: instance.InstanceId,
Device: &hostDevice,
}
_, err = ebs.client.AttachVolume(attachInput)
if err != nil {
return fmt.Errorf("error attaching volume %s to node %s: %w", volumeID, nodeName, err)
}
return ebs.waitForAttach(volumeID)
}
func (ebs *EBSUtil) findFreeDevice(instance *ec2.Instance) (string, error) {
deviceMappings := map[string]string{}
for _, blockDevice := range instance.BlockDeviceMappings {
name := aws.StringValue(blockDevice.DeviceName)
name = strings.TrimPrefix(name, "/dev/sd")
name = strings.TrimPrefix(name, "/dev/xvd")
if len(name) < 1 || len(name) > 2 {
klog.Warningf("Unexpected EBS DeviceName: %q", aws.StringValue(blockDevice.DeviceName))
}
deviceMappings[name] = aws.StringValue(blockDevice.Ebs.VolumeId)
}
for _, device := range ebs.validDevices {
if _, found := deviceMappings[device]; !found {
return device, nil
}
}
return "", fmt.Errorf("no available device")
}
func (ebs *EBSUtil) waitForAttach(volumeID string) error {
backoff := wait.Backoff{
Duration: volumeAttachmentStatusPollDelay,
Factor: volumeAttachmentStatusFactor,
Steps: volumeAttachmentStatusSteps,
}
time.Sleep(volumeAttachmentStatusPollDelay)
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
info, err := ebs.describeVolume(volumeID)
if err != nil {
return false, err
}
if len(info.Attachments) > 1 {
// Shouldn't happen; log so we know if it is
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, info)
}
attachmentStatus := ""
for _, a := range info.Attachments {
if attachmentStatus != "" {
// Shouldn't happen; log so we know if it is
klog.Warningf("Found multiple attachments for volume %q: %v", volumeID, info)
}
if a.State != nil {
attachmentStatus = *a.State
} else {
// Shouldn't happen; log so we know if it is
klog.Warningf("Ignoring nil attachment state for volume %q: %v", volumeID, a)
}
}
if attachmentStatus == "" {
attachmentStatus = volumeDetachedStatus
}
if attachmentStatus == volumeAttachedStatus {
// Attachment is in requested state, finish waiting
return true, nil
}
return false, nil
})
return err
}
func (ebs *EBSUtil) waitForAvailable(volumeID string) error {
backoff := wait.Backoff{
Duration: volumeAttachmentStatusPollDelay,
Factor: volumeAttachmentStatusFactor,
Steps: volumeAttachmentStatusSteps,
}
time.Sleep(volumeAttachmentStatusPollDelay)
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
info, err := ebs.describeVolume(volumeID)
if err != nil {
return false, err
}
volumeState := aws.StringValue(info.State)
if volumeState != ec2.VolumeStateAvailable {
return false, nil
}
return true, nil
})
return err
}
// Gets the full information about this volume from the EC2 API
func (ebs *EBSUtil) describeVolume(volumeID string) (*ec2.Volume, error) {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
}
results := []*ec2.Volume{}
var nextToken *string
for {
response, err := ebs.client.DescribeVolumes(request)
if err != nil {
return nil, err
}
results = append(results, response.Volumes...)
nextToken = response.NextToken
if aws.StringValue(nextToken) == "" {
break
}
request.NextToken = nextToken
}
if len(results) == 0 {
return nil, fmt.Errorf("no volumes found")
}
if len(results) > 1 {
return nil, fmt.Errorf("multiple volumes found")
}
return results[0], nil
}
func newEc2Filter(name string, value string) *ec2.Filter {
filter := &ec2.Filter{
Name: aws.String(name),
Values: []*string{
aws.String(value),
},
}
return filter
}
func findInstanceByNodeName(nodeName string, cloud *ec2.EC2) (*ec2.Instance, error) {
filters := []*ec2.Filter{
newEc2Filter("private-dns-name", nodeName),
}
request := &ec2.DescribeInstancesInput{
Filters: filters,
}
instances, err := describeInstances(request, cloud)
if err != nil {
return nil, err
}
if len(instances) == 0 {
return nil, nil
}
if len(instances) > 1 {
return nil, fmt.Errorf("multiple instances found for name: %s", nodeName)
}
return instances[0], nil
}
func describeInstances(request *ec2.DescribeInstancesInput, cloud *ec2.EC2) ([]*ec2.Instance, error) {
// Instances are paged
results := []*ec2.Instance{}
var nextToken *string
for {
response, err := cloud.DescribeInstances(request)
if err != nil {
return nil, fmt.Errorf("error listing AWS instances: %w", err)
}
for _, reservation := range response.Reservations {
results = append(results, reservation.Instances...)
}
nextToken = response.NextToken
if nextToken == nil || len(*nextToken) == 0 {
break
}
request.NextToken = nextToken
}
return results, nil
}

View File

@ -25,10 +25,6 @@ import (
"github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega" "github.com/onsi/gomega"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1" rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
@ -58,63 +54,6 @@ const (
externalPluginName = "example.com/nfs" externalPluginName = "example.com/nfs"
) )
// checkAWSEBS checks properties of an AWS EBS. Test framework does not
// instantiate full AWS provider, therefore we need use ec2 API directly.
func checkAWSEBS(volume *v1.PersistentVolume, volumeType string, encrypted bool) error {
diskName := volume.Spec.AWSElasticBlockStore.VolumeID
var client *ec2.EC2
tokens := strings.Split(diskName, "/")
volumeID := tokens[len(tokens)-1]
zone := framework.TestContext.CloudConfig.Zone
awsSession, err := session.NewSession()
if err != nil {
return fmt.Errorf("error creating session: %w", err)
}
if len(zone) > 0 {
region := zone[:len(zone)-1]
cfg := aws.Config{Region: &region}
framework.Logf("using region %s", region)
client = ec2.New(awsSession, &cfg)
} else {
framework.Logf("no region configured")
client = ec2.New(awsSession)
}
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{&volumeID},
}
info, err := client.DescribeVolumes(request)
if err != nil {
return fmt.Errorf("error querying ec2 for volume %q: %w", volumeID, err)
}
if len(info.Volumes) == 0 {
return fmt.Errorf("no volumes found for volume %q", volumeID)
}
if len(info.Volumes) > 1 {
return fmt.Errorf("multiple volumes found for volume %q", volumeID)
}
awsVolume := info.Volumes[0]
if awsVolume.VolumeType == nil {
return fmt.Errorf("expected volume type %q, got nil", volumeType)
}
if *awsVolume.VolumeType != volumeType {
return fmt.Errorf("expected volume type %q, got %q", volumeType, *awsVolume.VolumeType)
}
if encrypted && awsVolume.Encrypted == nil {
return fmt.Errorf("expected encrypted volume, got no encryption")
}
if encrypted && !*awsVolume.Encrypted {
return fmt.Errorf("expected encrypted volume, got %v", *awsVolume.Encrypted)
}
return nil
}
func checkGCEPD(volume *v1.PersistentVolume, volumeType string) error { func checkGCEPD(volume *v1.PersistentVolume, volumeType string) error {
cloud, err := gce.GetGCECloud() cloud, err := gce.GetGCECloud()
if err != nil { if err != nil {
@ -191,99 +130,6 @@ var _ = utils.SIGDescribe("Dynamic Provisioning", func() {
framework.ExpectNoError(err, "checkGCEPD pd-standard") framework.ExpectNoError(err, "checkGCEPD pd-standard")
}, },
}, },
// AWS
{
Name: "gp2 EBS on AWS",
CloudProviders: []string{"aws"},
Timeouts: f.Timeouts,
Provisioner: "kubernetes.io/aws-ebs",
Parameters: map[string]string{
"type": "gp2",
"zone": getRandomClusterZone(ctx, c),
},
ClaimSize: "1.5Gi",
ExpectedSize: "2Gi",
PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, f.Timeouts, claim, e2epod.NodeSelection{})
gomega.Expect(volume).NotTo(gomega.BeNil(), "get bound PV")
err := checkAWSEBS(volume, "gp2", false)
framework.ExpectNoError(err, "checkAWSEBS gp2")
},
},
{
Name: "io1 EBS on AWS",
CloudProviders: []string{"aws"},
Timeouts: f.Timeouts,
Provisioner: "kubernetes.io/aws-ebs",
Parameters: map[string]string{
"type": "io1",
"iopsPerGB": "50",
},
ClaimSize: "3.5Gi",
ExpectedSize: "4Gi", // 4 GiB is minimum for io1
PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, f.Timeouts, claim, e2epod.NodeSelection{})
gomega.Expect(volume).NotTo(gomega.BeNil(), "get bound PV")
err := checkAWSEBS(volume, "io1", false)
framework.ExpectNoError(err, "checkAWSEBS io1")
},
},
{
Name: "sc1 EBS on AWS",
CloudProviders: []string{"aws"},
Timeouts: f.Timeouts,
Provisioner: "kubernetes.io/aws-ebs",
Parameters: map[string]string{
"type": "sc1",
},
ClaimSize: "500Gi", // minimum for sc1
ExpectedSize: "500Gi",
PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, f.Timeouts, claim, e2epod.NodeSelection{})
gomega.Expect(volume).NotTo(gomega.BeNil(), "get bound PV")
err := checkAWSEBS(volume, "sc1", false)
framework.ExpectNoError(err, "checkAWSEBS sc1")
},
},
{
Name: "st1 EBS on AWS",
CloudProviders: []string{"aws"},
Timeouts: f.Timeouts,
Provisioner: "kubernetes.io/aws-ebs",
Parameters: map[string]string{
"type": "st1",
},
ClaimSize: "500Gi", // minimum for st1
ExpectedSize: "500Gi",
PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, f.Timeouts, claim, e2epod.NodeSelection{})
gomega.Expect(volume).NotTo(gomega.BeNil(), "get bound PV")
err := checkAWSEBS(volume, "st1", false)
framework.ExpectNoError(err, "checkAWSEBS st1")
},
},
{
Name: "encrypted EBS on AWS",
CloudProviders: []string{"aws"},
Timeouts: f.Timeouts,
Provisioner: "kubernetes.io/aws-ebs",
Parameters: map[string]string{
"encrypted": "true",
},
ClaimSize: "1Gi",
ExpectedSize: "1Gi",
PvCheck: func(ctx context.Context, claim *v1.PersistentVolumeClaim) {
volume := testsuites.PVWriteReadSingleNodeCheck(ctx, c, f.Timeouts, claim, e2epod.NodeSelection{})
gomega.Expect(volume).NotTo(gomega.BeNil(), "get bound PV")
err := checkAWSEBS(volume, "gp2", true)
framework.ExpectNoError(err, "checkAWSEBS gp2 encrypted")
},
},
// OpenStack generic tests (works on all OpenStack deployments) // OpenStack generic tests (works on all OpenStack deployments)
{ {
Name: "generic Cinder volume on OpenStack", Name: "generic Cinder volume on OpenStack",