Fix Multizone pv creation on GCE

When Multizone is enabled static PV creation on GCE
fails because Cloud provider configuration is not
available in admission plugins.
This commit is contained in:
Hemant Kumar 2017-01-26 11:04:23 -05:00
parent 77733c2afd
commit b0581d688d
11 changed files with 80 additions and 11 deletions

View File

@ -299,7 +299,15 @@ func Run(s *options.ServerRunOptions) error {
} }
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",") admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer) var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile) admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
if err != nil { if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err) return fmt.Errorf("failed to read plugin config: %v", err)

View File

@ -21,6 +21,7 @@ package app
import ( import (
"fmt" "fmt"
"io/ioutil"
"strings" "strings"
"time" "time"
@ -168,7 +169,15 @@ func Run(s *options.ServerRunOptions) error {
} }
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",") admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer) var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile) admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
if err != nil { if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err) return fmt.Errorf("failed to read plugin config: %v", err)

View File

@ -51,10 +51,33 @@ var _ WantsAuthorizer = &WantAuthorizerAdmission{}
// TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer // TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer
// interface is implemented. // interface is implemented.
func TestWantsAuthorizer(t *testing.T) { func TestWantsAuthorizer(t *testing.T) {
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}) initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, nil)
wantAuthorizerAdmission := &WantAuthorizerAdmission{} wantAuthorizerAdmission := &WantAuthorizerAdmission{}
initializer.Initialize(wantAuthorizerAdmission) initializer.Initialize(wantAuthorizerAdmission)
if wantAuthorizerAdmission.auth == nil { if wantAuthorizerAdmission.auth == nil {
t.Errorf("expected authorizer to be initialized but found nil") t.Errorf("expected authorizer to be initialized but found nil")
} }
} }
type WantsCloudConfigAdmissionPlugin struct {
cloudConfig []byte
}
func (self *WantsCloudConfigAdmissionPlugin) SetCloudConfig(cloudConfig []byte) {
self.cloudConfig = cloudConfig
}
func (self *WantsCloudConfigAdmissionPlugin) Admit(a admission.Attributes) error { return nil }
func (self *WantsCloudConfigAdmissionPlugin) Handles(o admission.Operation) bool { return false }
func (self *WantsCloudConfigAdmissionPlugin) Validate() error { return nil }
func TestCloudConfigAdmissionPlugin(t *testing.T) {
cloudConfig := []byte("cloud-configuration")
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, cloudConfig)
wantsCloudConfigAdmission := &WantsCloudConfigAdmissionPlugin{}
initializer.Initialize(wantsCloudConfigAdmission)
if wantsCloudConfigAdmission.cloudConfig == nil {
t.Errorf("Expected cloud config to be initialized but found nil")
}
}

View File

@ -43,20 +43,27 @@ type WantsAuthorizer interface {
admission.Validator admission.Validator
} }
// WantsCloudConfig defines a function which sets CloudConfig for admission plugins that need it.
type WantsCloudConfig interface {
SetCloudConfig([]byte)
}
type pluginInitializer struct { type pluginInitializer struct {
internalClient internalclientset.Interface internalClient internalclientset.Interface
informers informers.SharedInformerFactory informers informers.SharedInformerFactory
authorizer authorizer.Authorizer authorizer authorizer.Authorizer
cloudConfig []byte
} }
var _ admission.PluginInitializer = pluginInitializer{} var _ admission.PluginInitializer = pluginInitializer{}
// NewPluginInitializer constructs new instance of PluginInitializer // NewPluginInitializer constructs new instance of PluginInitializer
func NewPluginInitializer(internalClient internalclientset.Interface, sharedInformers informers.SharedInformerFactory, authz authorizer.Authorizer) admission.PluginInitializer { func NewPluginInitializer(internalClient internalclientset.Interface, sharedInformers informers.SharedInformerFactory, authz authorizer.Authorizer, cloudConfig []byte) admission.PluginInitializer {
return pluginInitializer{ return pluginInitializer{
internalClient: internalClient, internalClient: internalClient,
informers: sharedInformers, informers: sharedInformers,
authorizer: authz, authorizer: authz,
cloudConfig: cloudConfig,
} }
} }
@ -74,4 +81,8 @@ func (i pluginInitializer) Initialize(plugin admission.Interface) {
if wants, ok := plugin.(WantsAuthorizer); ok { if wants, ok := plugin.(WantsAuthorizer); ok {
wants.SetAuthorizer(i.authorizer) wants.SetAuthorizer(i.authorizer)
} }
if wants, ok := plugin.(WantsCloudConfig); ok {
wants.SetCloudConfig(i.cloudConfig)
}
} }

View File

@ -595,7 +595,7 @@ func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.Sh
if err != nil { if err != nil {
return nil, f, err return nil, f, err
} }
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err = admission.Validate(handler) err = admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -38,7 +38,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute) f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewProvision() handler := NewProvision()
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err := admission.Validate(handler) err := admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -37,7 +37,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute) f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewExists() handler := NewExists()
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err := admission.Validate(handler) err := admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -47,7 +47,7 @@ func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (
if err != nil { if err != nil {
return nil, f, err return nil, f, err
} }
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err = admission.Validate(handler) err = admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library", "//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/admission",

View File

@ -17,6 +17,7 @@ limitations under the License.
package label package label
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@ -27,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
vol "k8s.io/kubernetes/pkg/volume" vol "k8s.io/kubernetes/pkg/volume"
) )
@ -44,9 +46,12 @@ type persistentVolumeLabel struct {
mutex sync.Mutex mutex sync.Mutex
ebsVolumes aws.Volumes ebsVolumes aws.Volumes
cloudConfig []byte
gceCloudProvider *gce.GCECloud gceCloudProvider *gce.GCECloud
} }
var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
// NewPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests, // NewPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
// based on the labels provided by the underlying cloud provider. // based on the labels provided by the underlying cloud provider.
// //
@ -57,6 +62,10 @@ func NewPersistentVolumeLabel() *persistentVolumeLabel {
} }
} }
func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
l.cloudConfig = cloudConfig
}
func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) { func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
if a.GetResource().GroupResource() != api.Resource("persistentvolumes") { if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
return nil return nil
@ -131,7 +140,11 @@ func (l *persistentVolumeLabel) getEBSVolumes() (aws.Volumes, error) {
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.ebsVolumes == nil { if l.ebsVolumes == nil {
cloudProvider, err := cloudprovider.GetCloudProvider("aws", nil) var cloudConfigReader io.Reader
if len(l.cloudConfig) > 0 {
cloudConfigReader = bytes.NewReader(l.cloudConfig)
}
cloudProvider, err := cloudprovider.GetCloudProvider("aws", cloudConfigReader)
if err != nil || cloudProvider == nil { if err != nil || cloudProvider == nil {
return nil, err return nil, err
} }
@ -176,7 +189,11 @@ func (l *persistentVolumeLabel) getGCECloudProvider() (*gce.GCECloud, error) {
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.gceCloudProvider == nil { if l.gceCloudProvider == nil {
cloudProvider, err := cloudprovider.GetCloudProvider("gce", nil) var cloudConfigReader io.Reader
if len(l.cloudConfig) > 0 {
cloudConfigReader = bytes.NewReader(l.cloudConfig)
}
cloudProvider, err := cloudprovider.GetCloudProvider("gce", cloudConfigReader)
if err != nil || cloudProvider == nil { if err != nil || cloudProvider == nil {
return nil, err return nil, err
} }

View File

@ -183,7 +183,7 @@ func TestHandles(t *testing.T) {
func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) { func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute) f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewPodNodeSelector(nil) handler := NewPodNodeSelector(nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err := admission.Validate(handler) err := admission.Validate(handler)
return handler, f, err return handler, f, err