Merge pull request #40537 from gnufied/fix-multizone-pv-breakage

Automatic merge from submit-queue

Fix Multizone pv creation on GCE

When Multizone is enabled static PV creation on GCE
fails because Cloud provider configuration is not
available in admission plugins.

cc @derekwaynecarr @childsb
This commit is contained in:
Kubernetes Submit Queue 2017-03-05 11:16:46 -08:00 committed by GitHub
commit df70b30e59
11 changed files with 80 additions and 11 deletions

View File

@ -332,7 +332,15 @@ func BuildMasterConfig(s *options.ServerRunOptions) (*master.Config, informers.S
} }
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",") admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer) var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}
pluginInitializer := kubeadmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile) admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to read plugin config: %v", err) return nil, nil, fmt.Errorf("failed to read plugin config: %v", err)

View File

@ -21,6 +21,7 @@ package app
import ( import (
"fmt" "fmt"
"io/ioutil"
"strings" "strings"
"time" "time"
@ -168,7 +169,15 @@ func Run(s *options.ServerRunOptions) error {
} }
admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",") admissionControlPluginNames := strings.Split(s.GenericServerRunOptions.AdmissionControl, ",")
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer) var cloudConfig []byte
if s.CloudProvider.CloudConfigFile != "" {
cloudConfig, err = ioutil.ReadFile(s.CloudProvider.CloudConfigFile)
if err != nil {
glog.Fatalf("Error reading from cloud configuration file %s: %#v", s.CloudProvider.CloudConfigFile, err)
}
}
pluginInitializer := kubeapiserveradmission.NewPluginInitializer(client, sharedInformers, apiAuthorizer, cloudConfig)
admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile) admissionConfigProvider, err := admission.ReadAdmissionConfiguration(admissionControlPluginNames, s.GenericServerRunOptions.AdmissionControlConfigFile)
if err != nil { if err != nil {
return fmt.Errorf("failed to read plugin config: %v", err) return fmt.Errorf("failed to read plugin config: %v", err)

View File

@ -51,10 +51,33 @@ var _ WantsAuthorizer = &WantAuthorizerAdmission{}
// TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer // TestWantsAuthorizer ensures that the authorizer is injected when the WantsAuthorizer
// interface is implemented. // interface is implemented.
func TestWantsAuthorizer(t *testing.T) { func TestWantsAuthorizer(t *testing.T) {
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}) initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, nil)
wantAuthorizerAdmission := &WantAuthorizerAdmission{} wantAuthorizerAdmission := &WantAuthorizerAdmission{}
initializer.Initialize(wantAuthorizerAdmission) initializer.Initialize(wantAuthorizerAdmission)
if wantAuthorizerAdmission.auth == nil { if wantAuthorizerAdmission.auth == nil {
t.Errorf("expected authorizer to be initialized but found nil") t.Errorf("expected authorizer to be initialized but found nil")
} }
} }
type WantsCloudConfigAdmissionPlugin struct {
cloudConfig []byte
}
func (self *WantsCloudConfigAdmissionPlugin) SetCloudConfig(cloudConfig []byte) {
self.cloudConfig = cloudConfig
}
func (self *WantsCloudConfigAdmissionPlugin) Admit(a admission.Attributes) error { return nil }
func (self *WantsCloudConfigAdmissionPlugin) Handles(o admission.Operation) bool { return false }
func (self *WantsCloudConfigAdmissionPlugin) Validate() error { return nil }
func TestCloudConfigAdmissionPlugin(t *testing.T) {
cloudConfig := []byte("cloud-configuration")
initializer := NewPluginInitializer(nil, nil, &TestAuthorizer{}, cloudConfig)
wantsCloudConfigAdmission := &WantsCloudConfigAdmissionPlugin{}
initializer.Initialize(wantsCloudConfigAdmission)
if wantsCloudConfigAdmission.cloudConfig == nil {
t.Errorf("Expected cloud config to be initialized but found nil")
}
}

View File

@ -43,20 +43,27 @@ type WantsAuthorizer interface {
admission.Validator admission.Validator
} }
// WantsCloudConfig defines a function which sets CloudConfig for admission plugins that need it.
type WantsCloudConfig interface {
SetCloudConfig([]byte)
}
type pluginInitializer struct { type pluginInitializer struct {
internalClient internalclientset.Interface internalClient internalclientset.Interface
informers informers.SharedInformerFactory informers informers.SharedInformerFactory
authorizer authorizer.Authorizer authorizer authorizer.Authorizer
cloudConfig []byte
} }
var _ admission.PluginInitializer = pluginInitializer{} var _ admission.PluginInitializer = pluginInitializer{}
// NewPluginInitializer constructs new instance of PluginInitializer // NewPluginInitializer constructs new instance of PluginInitializer
func NewPluginInitializer(internalClient internalclientset.Interface, sharedInformers informers.SharedInformerFactory, authz authorizer.Authorizer) admission.PluginInitializer { func NewPluginInitializer(internalClient internalclientset.Interface, sharedInformers informers.SharedInformerFactory, authz authorizer.Authorizer, cloudConfig []byte) admission.PluginInitializer {
return pluginInitializer{ return pluginInitializer{
internalClient: internalClient, internalClient: internalClient,
informers: sharedInformers, informers: sharedInformers,
authorizer: authz, authorizer: authz,
cloudConfig: cloudConfig,
} }
} }
@ -74,4 +81,8 @@ func (i pluginInitializer) Initialize(plugin admission.Interface) {
if wants, ok := plugin.(WantsAuthorizer); ok { if wants, ok := plugin.(WantsAuthorizer); ok {
wants.SetAuthorizer(i.authorizer) wants.SetAuthorizer(i.authorizer)
} }
if wants, ok := plugin.(WantsCloudConfig); ok {
wants.SetCloudConfig(i.cloudConfig)
}
} }

View File

@ -595,7 +595,7 @@ func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.Sh
if err != nil { if err != nil {
return nil, f, err return nil, f, err
} }
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err = admission.Validate(handler) err = admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -38,7 +38,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute) f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewProvision() handler := NewProvision()
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err := admission.Validate(handler) err := admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -37,7 +37,7 @@ import (
func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) { func newHandlerForTest(c clientset.Interface) (admission.Interface, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute) f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewExists() handler := NewExists()
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err := admission.Validate(handler) err := admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -48,7 +48,7 @@ func newHandlerForTestWithClock(c clientset.Interface, cacheClock clock.Clock) (
if err != nil { if err != nil {
return nil, f, err return nil, f, err
} }
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err = admission.Validate(handler) err = admission.Validate(handler)
return handler, f, err return handler, f, err

View File

@ -20,6 +20,7 @@ go_library(
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library", "//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library", "//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apiserver/pkg/admission", "//vendor:k8s.io/apiserver/pkg/admission",

View File

@ -17,6 +17,7 @@ limitations under the License.
package label package label
import ( import (
"bytes"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@ -27,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws" "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
vol "k8s.io/kubernetes/pkg/volume" vol "k8s.io/kubernetes/pkg/volume"
) )
@ -44,9 +46,12 @@ type persistentVolumeLabel struct {
mutex sync.Mutex mutex sync.Mutex
ebsVolumes aws.Volumes ebsVolumes aws.Volumes
cloudConfig []byte
gceCloudProvider *gce.GCECloud gceCloudProvider *gce.GCECloud
} }
var _ kubeapiserveradmission.WantsCloudConfig = &persistentVolumeLabel{}
// NewPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests, // NewPersistentVolumeLabel returns an admission.Interface implementation which adds labels to PersistentVolume CREATE requests,
// based on the labels provided by the underlying cloud provider. // based on the labels provided by the underlying cloud provider.
// //
@ -57,6 +62,10 @@ func NewPersistentVolumeLabel() *persistentVolumeLabel {
} }
} }
func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
l.cloudConfig = cloudConfig
}
func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) { func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
if a.GetResource().GroupResource() != api.Resource("persistentvolumes") { if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
return nil return nil
@ -131,7 +140,11 @@ func (l *persistentVolumeLabel) getEBSVolumes() (aws.Volumes, error) {
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.ebsVolumes == nil { if l.ebsVolumes == nil {
cloudProvider, err := cloudprovider.GetCloudProvider("aws", nil) var cloudConfigReader io.Reader
if len(l.cloudConfig) > 0 {
cloudConfigReader = bytes.NewReader(l.cloudConfig)
}
cloudProvider, err := cloudprovider.GetCloudProvider("aws", cloudConfigReader)
if err != nil || cloudProvider == nil { if err != nil || cloudProvider == nil {
return nil, err return nil, err
} }
@ -176,7 +189,11 @@ func (l *persistentVolumeLabel) getGCECloudProvider() (*gce.GCECloud, error) {
defer l.mutex.Unlock() defer l.mutex.Unlock()
if l.gceCloudProvider == nil { if l.gceCloudProvider == nil {
cloudProvider, err := cloudprovider.GetCloudProvider("gce", nil) var cloudConfigReader io.Reader
if len(l.cloudConfig) > 0 {
cloudConfigReader = bytes.NewReader(l.cloudConfig)
}
cloudProvider, err := cloudprovider.GetCloudProvider("gce", cloudConfigReader)
if err != nil || cloudProvider == nil { if err != nil || cloudProvider == nil {
return nil, err return nil, err
} }

View File

@ -183,7 +183,7 @@ func TestHandles(t *testing.T) {
func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) { func newHandlerForTest(c clientset.Interface) (*podNodeSelector, informers.SharedInformerFactory, error) {
f := informers.NewSharedInformerFactory(c, 5*time.Minute) f := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewPodNodeSelector(nil) handler := NewPodNodeSelector(nil)
pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil) pluginInitializer := kubeadmission.NewPluginInitializer(c, f, nil, nil)
pluginInitializer.Initialize(handler) pluginInitializer.Initialize(handler)
err := admission.Validate(handler) err := admission.Validate(handler)
return handler, f, err return handler, f, err