Merge pull request #60483 from hzxuzhonghu/kube-apiserver-runoptions

Automatic merge from submit-queue (batch tested with PRs 58420, 60483). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Kube apiserver ServerRunOptions set default called before use

**What this PR does / why we need it**:

move `ServerRunOptions` set default function  `defaultOptions` out of `CreateKubeAPIServerConfig`, it should be called before real use `CreateNodeDialer`. So move it to cobra.Command just after kube-apiserver flags parsed.

Similarly `ServerRunOptions.Validate` move there too.

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2018-04-04 05:03:08 -07:00 committed by GitHub
commit 8438cbe669
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 88 additions and 152 deletions

View File

@ -112,15 +112,23 @@ func NewAPIServerCommand() *cobra.Command {
for the api objects which include pods, services, replicationcontrollers, and
others. The API Server services REST operations and provides the frontend to the
cluster's shared state through which all other components interact.`,
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
utilflag.PrintFlags(cmd.Flags())
stopCh := server.SetupSignalHandler()
if err := Run(s, stopCh); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
// set default options
completedOptions, err := Complete(s)
if err != nil {
return err
}
// validate options
if errs := completedOptions.Validate(); len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
stopCh := server.SetupSignalHandler()
return Run(completedOptions, stopCh)
},
}
s.AddFlags(cmd.Flags())
@ -129,11 +137,11 @@ cluster's shared state through which all other components interact.`,
}
// Run runs the specified APIServer. This should never exit.
func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
glog.Infof("Version: %+v", version.Get())
server, err := CreateServerChain(runOptions, stopCh)
server, err := CreateServerChain(completeOptions, stopCh)
if err != nil {
return err
}
@ -142,19 +150,19 @@ func Run(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) error {
}
// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(runOptions)
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*genericapiserver.GenericAPIServer, error) {
nodeTunneler, proxyTransport, err := CreateNodeDialer(completedOptions)
if err != nil {
return nil, err
}
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(runOptions, nodeTunneler, proxyTransport)
kubeAPIServerConfig, sharedInformers, versionedInformers, insecureServingOptions, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
if err != nil {
return nil, err
}
// If additional API servers are added, they should be gated.
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, runOptions)
apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, versionedInformers, pluginInitializer, completedOptions.ServerRunOptions)
if err != nil {
return nil, err
}
@ -189,7 +197,7 @@ func CreateServerChain(runOptions *options.ServerRunOptions, stopCh <-chan struc
apiExtensionsServer.GenericAPIServer.PrepareRun()
// aggregator comes last in the chain
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, runOptions, versionedInformers, serviceResolver, proxyTransport, pluginInitializer)
aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, versionedInformers, serviceResolver, proxyTransport, pluginInitializer)
if err != nil {
return nil, err
}
@ -224,7 +232,7 @@ func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer g
}
// CreateNodeDialer creates the dialer infrastructure to connect to the nodes.
func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
func CreateNodeDialer(s completedServerRunOptions) (tunneler.Tunneler, *http.Transport, error) {
// Setup nodeTunneler if needed
var nodeTunneler tunneler.Tunneler
var proxyDialerFn utilnet.DialFunc
@ -270,7 +278,7 @@ func CreateNodeDialer(s *options.ServerRunOptions) (tunneler.Tunneler, *http.Tra
// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(
s *options.ServerRunOptions,
s completedServerRunOptions,
nodeTunneler tunneler.Tunneler,
proxyTransport *http.Transport,
) (
@ -282,19 +290,8 @@ func CreateKubeAPIServerConfig(
pluginInitializers []admission.PluginInitializer,
lastErr error,
) {
// set defaults in the options before trying to create the generic config
if lastErr = defaultOptions(s); lastErr != nil {
return
}
// validate options
if errs := s.Validate(); len(errs) != 0 {
lastErr = utilerrors.NewAggregate(errs)
return
}
var genericConfig *genericapiserver.Config
genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, lastErr = BuildGenericConfig(s, proxyTransport)
genericConfig, sharedInformers, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, lastErr = BuildGenericConfig(s.ServerRunOptions, proxyTransport)
if lastErr != nil {
return
}
@ -322,7 +319,7 @@ func CreateKubeAPIServerConfig(
return
}
storageFactory, lastErr := BuildStorageFactory(s, genericConfig.MergedResourceConfig)
storageFactory, lastErr := BuildStorageFactory(s.ServerRunOptions, genericConfig.MergedResourceConfig)
if lastErr != nil {
return
}
@ -673,21 +670,29 @@ func BuildStorageFactory(s *options.ServerRunOptions, apiResourceConfig *servers
return storageFactory, nil
}
func defaultOptions(s *options.ServerRunOptions) error {
// completedServerRunOptions is a private wrapper that enforces a call of Complete() before Run can be invoked.
type completedServerRunOptions struct {
*options.ServerRunOptions
}
// Complete set default ServerRunOptions.
// Should be called after kube-apiserver flags parsed.
func Complete(s *options.ServerRunOptions) (completedServerRunOptions, error) {
var options completedServerRunOptions
// set defaults
if err := s.GenericServerRunOptions.DefaultAdvertiseAddress(s.SecureServing.SecureServingOptions); err != nil {
return err
return options, err
}
if err := kubeoptions.DefaultAdvertiseAddress(s.GenericServerRunOptions, s.InsecureServing); err != nil {
return err
return options, err
}
serviceIPRange, apiServerServiceIP, err := master.DefaultServiceIPRange(s.ServiceClusterIPRange)
if err != nil {
return fmt.Errorf("error determining service IP ranges: %v", err)
return options, fmt.Errorf("error determining service IP ranges: %v", err)
}
s.ServiceClusterIPRange = serviceIPRange
if err := s.SecureServing.MaybeDefaultWithSelfSignedCerts(s.GenericServerRunOptions.AdvertiseAddress.String(), []string{"kubernetes.default.svc", "kubernetes.default", "kubernetes"}, []net.IP{apiServerServiceIP}); err != nil {
return fmt.Errorf("error creating self-signed certificates: %v", err)
return options, fmt.Errorf("error creating self-signed certificates: %v", err)
}
if len(s.GenericServerRunOptions.ExternalHost) == 0 {
@ -697,7 +702,7 @@ func defaultOptions(s *options.ServerRunOptions) error {
if hostname, err := os.Hostname(); err == nil {
s.GenericServerRunOptions.ExternalHost = hostname
} else {
return fmt.Errorf("error finding host name: %v", err)
return options, fmt.Errorf("error finding host name: %v", err)
}
}
glog.Infof("external host was not specified, using %v", s.GenericServerRunOptions.ExternalHost)
@ -752,7 +757,7 @@ func defaultOptions(s *options.ServerRunOptions) error {
}
s.Etcd.WatchCacheSizes, err = serveroptions.WriteWatchCacheSizes(sizes)
if err != nil {
return err
return options, err
}
}
@ -769,8 +774,8 @@ func defaultOptions(s *options.ServerRunOptions) error {
}
}
}
return nil
options.ServerRunOptions = s
return options, nil
}
func readCAorNil(file string) ([]byte, error) {

View File

@ -102,9 +102,13 @@ func StartTestServer(t Logger, customFlags []string, storageConfig *storagebacke
s.APIEnablement.RuntimeConfig.Set("api/all=true")
fs.Parse(customFlags)
completedOptions, err := app.Complete(s)
if err != nil {
return result, fmt.Errorf("failed to set default ServerRunOptions: %v", err)
}
t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort)
server, err := app.CreateServerChain(s, stopCh)
server, err := app.CreateServerChain(completedOptions, stopCh)
if err != nil {
return result, fmt.Errorf("failed to create server chain: %v", err)

View File

@ -1,5 +1,6 @@
cluster/images/etcd-version-monitor
cmd/hyperkube
cmd/kube-apiserver/app
cmd/kube-controller-manager/app
cmd/kube-proxy/app
cmd/kube-scheduler/app

View File

@ -40,27 +40,33 @@ func NewAPIServer() *APIServer {
// Start starts the apiserver, returns when apiserver is ready.
func (a *APIServer) Start() error {
config := options.NewServerRunOptions()
config.Etcd.StorageConfig.ServerList = []string{getEtcdClientURL()}
o := options.NewServerRunOptions()
o.Etcd.StorageConfig.ServerList = []string{getEtcdClientURL()}
// TODO: Current setup of etcd in e2e-node tests doesn't support etcd v3
// protocol. We should migrate it to use the same infrastructure as all
// other tests (pkg/storage/etcd/testing).
config.Etcd.StorageConfig.Type = "etcd2"
o.Etcd.StorageConfig.Type = "etcd2"
_, ipnet, err := net.ParseCIDR(clusterIPRange)
if err != nil {
return err
}
config.ServiceClusterIPRange = *ipnet
config.AllowPrivileged = true
config.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
o.ServiceClusterIPRange = *ipnet
o.AllowPrivileged = true
o.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
errCh := make(chan error)
go func() {
defer close(errCh)
stopCh := make(chan struct{})
defer close(stopCh)
err := apiserver.Run(config, stopCh)
completedOptions, err := apiserver.Complete(o)
if err != nil {
errCh <- fmt.Errorf("set apiserver default options error: %v", err)
return
}
err = apiserver.Run(completedOptions, stopCh)
if err != nil {
errCh <- fmt.Errorf("run apiserver error: %v", err)
return
}
}()

View File

@ -735,12 +735,16 @@ func startRealMasterOrDie(t *testing.T, certDir string) (*allClient, clientv3.KV
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
kubeAPIServerOptions.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
completedOptions, err := app.Complete(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}

View File

@ -110,12 +110,16 @@ func TestAggregatedAPIServer(t *testing.T) {
kubeAPIServerOptions.Authentication.RequestHeader.ClientCAFile = proxyCACertFile.Name()
kubeAPIServerOptions.Authentication.ClientCert.ClientCA = clientCACertFile.Name()
kubeAPIServerOptions.Authorization.Modes = []string{"RBAC"}
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
completedOptions, err := app.Complete(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
tunneler, proxyTransport, err := app.CreateNodeDialer(completedOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}

View File

@ -9,14 +9,8 @@ go_test(
],
tags = ["integration"],
deps = [
"//cmd/kube-apiserver/app:go_default_library",
"//cmd/kube-apiserver/app/options:go_default_library",
"//cmd/kube-apiserver/app/testing:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
],
)

View File

@ -19,107 +19,26 @@ package tls
import (
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"sync/atomic"
"strings"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
genericapiserver "k8s.io/apiserver/pkg/server"
genericapiserveroptions "k8s.io/apiserver/pkg/server/options"
client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration/framework"
)
func runBasicSecureAPIServer(t *testing.T, ciphers []string) (uint32, error) {
certDir, _ := ioutil.TempDir("", "test-integration-tls")
defer os.RemoveAll(certDir)
_, defaultServiceClusterIPRange, _ := net.ParseCIDR("10.0.0.0/24")
kubeClientConfigValue := atomic.Value{}
var kubePort uint32
go func() {
listener, port, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
atomic.StoreUint32(&kubePort, uint32(port))
kubeAPIServerOptions := options.NewServerRunOptions()
kubeAPIServerOptions.SecureServing.BindAddress = net.ParseIP("127.0.0.1")
kubeAPIServerOptions.SecureServing.BindPort = port
kubeAPIServerOptions.SecureServing.Listener = listener
kubeAPIServerOptions.SecureServing.ServerCert.CertDirectory = certDir
kubeAPIServerOptions.SecureServing.CipherSuites = ciphers
kubeAPIServerOptions.InsecureServing.BindPort = 0
kubeAPIServerOptions.Etcd.StorageConfig.ServerList = []string{framework.GetEtcdURL()}
kubeAPIServerOptions.ServiceClusterIPRange = *defaultServiceClusterIPRange
tunneler, proxyTransport, err := app.CreateNodeDialer(kubeAPIServerOptions)
if err != nil {
t.Fatal(err)
}
kubeAPIServerConfig, sharedInformers, versionedInformers, _, _, _, err := app.CreateKubeAPIServerConfig(kubeAPIServerOptions, tunneler, proxyTransport)
if err != nil {
t.Fatal(err)
}
kubeClientConfigValue.Store(kubeAPIServerConfig.GenericConfig.LoopbackClientConfig)
kubeAPIServer, err := app.CreateKubeAPIServer(kubeAPIServerConfig, genericapiserver.EmptyDelegate, sharedInformers, versionedInformers)
if err != nil {
t.Fatal(err)
}
if err := kubeAPIServer.GenericAPIServer.PrepareRun().Run(wait.NeverStop); err != nil {
t.Log(err)
}
time.Sleep(100 * time.Millisecond)
}()
// Ensure server is ready
err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
obj := kubeClientConfigValue.Load()
if obj == nil {
return false, nil
}
kubeClientConfig := kubeClientConfigValue.Load().(*rest.Config)
kubeClientConfig.ContentType = ""
kubeClientConfig.AcceptContentTypes = ""
kubeClient, err := client.NewForConfig(kubeClientConfig)
if err != nil {
// this happens because we race the API server start
t.Log(err)
return false, nil
}
if _, err := kubeClient.Discovery().ServerVersion(); err != nil {
return false, nil
}
return true, nil
})
if err != nil {
return 0, err
}
securePort := atomic.LoadUint32(&kubePort)
return securePort, nil
func runBasicSecureAPIServer(t *testing.T, ciphers []string) (kubeapiservertesting.TearDownFunc, int) {
flags := []string{"--tls-cipher-suites", strings.Join(ciphers, ",")}
testServer := kubeapiservertesting.StartTestServerOrDie(t, flags, framework.SharedEtcd())
return testServer.TearDownFn, testServer.ServerOpts.SecureServing.BindPort
}
func TestAPICiphers(t *testing.T) {
basicServerCiphers := []string{"TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305", "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305", "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA", "TLS_RSA_WITH_AES_128_GCM_SHA256", "TLS_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA", "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA", "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA"}
kubePort, err := runBasicSecureAPIServer(t, basicServerCiphers)
if err != nil {
t.Fatal(err)
}
tearDown, port := runBasicSecureAPIServer(t, basicServerCiphers)
defer tearDown()
tests := []struct {
clientCiphers []uint16
expectedError bool
@ -137,11 +56,11 @@ func TestAPICiphers(t *testing.T) {
}
for i, test := range tests {
runTestAPICiphers(t, i, kubePort, test.clientCiphers, test.expectedError)
runTestAPICiphers(t, i, port, test.clientCiphers, test.expectedError)
}
}
func runTestAPICiphers(t *testing.T, testID int, kubePort uint32, clientCiphers []uint16, expectedError bool) {
func runTestAPICiphers(t *testing.T, testID int, kubePort int, clientCiphers []uint16, expectedError bool) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{
@ -155,14 +74,13 @@ func runTestAPICiphers(t *testing.T, testID int, kubePort uint32, clientCiphers
t.Fatal(err)
}
resp, err := client.Do(req)
if err == nil {
defer resp.Body.Close()
}
if expectedError == true && err == nil {
t.Fatalf("%d: expecting error for cipher test, client cipher is supported and it should't", testID)
} else if err != nil && expectedError == false {
t.Fatalf("%d: not expecting error by client with cipher failed: %+v", testID, err)
}
if err == nil {
defer resp.Body.Close()
}
}