diff --git a/contrib/mesos/pkg/executor/apis.go b/contrib/mesos/pkg/executor/apis.go index e9b63084830..905d691cc5b 100644 --- a/contrib/mesos/pkg/executor/apis.go +++ b/contrib/mesos/pkg/executor/apis.go @@ -19,7 +19,7 @@ package executor import ( "k8s.io/kubernetes/contrib/mesos/pkg/node" "k8s.io/kubernetes/pkg/api" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + core_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" ) type kubeAPI interface { @@ -33,11 +33,11 @@ type nodeAPI interface { // clientAPIWrapper implements kubeAPI and node API, which serve to isolate external dependencies // such that they're easier to mock in unit test. type clientAPIWrapper struct { - client *clientset.Clientset + client core_unversioned.CoreInterface } func (cw *clientAPIWrapper) killPod(ns, name string) error { - return cw.client.Core().Pods(ns).Delete(name, api.NewDeleteOptions(0)) + return cw.client.Pods(ns).Delete(name, api.NewDeleteOptions(0)) } func (cw *clientAPIWrapper) createOrUpdate(hostname string, slaveAttrLabels, annotations map[string]string) (*api.Node, error) { diff --git a/contrib/mesos/pkg/node/node.go b/contrib/mesos/pkg/node/node.go index 2e7ce35cb57..68471c6a0d4 100644 --- a/contrib/mesos/pkg/node/node.go +++ b/contrib/mesos/pkg/node/node.go @@ -23,7 +23,7 @@ import ( "strings" "time" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + core_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" log "github.com/golang/glog" mesos "github.com/mesos/mesos-go/mesosproto" @@ -42,7 +42,7 @@ const ( // Create creates a new node api object with the given hostname, // slave attribute labels and annotations func Create( - client *clientset.Clientset, + client core_unversioned.NodesGetter, hostName string, slaveAttrLabels, annotations map[string]string, @@ -88,7 +88,7 @@ func Create( // The updated node merges the given slave attribute labels // and annotations with the found api object. func Update( - client *clientset.Clientset, + client core_unversioned.NodesGetter, hostname string, slaveAttrLabels, annotations map[string]string, @@ -123,7 +123,7 @@ func Update( // CreateOrUpdate creates a node api object or updates an existing one func CreateOrUpdate( - client *clientset.Clientset, + client core_unversioned.NodesGetter, hostname string, slaveAttrLabels, annotations map[string]string, diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index e3f9b01f96f..362b67977c8 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -33,8 +33,6 @@ import ( "sync" "time" - clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - etcd "github.com/coreos/etcd/client" "github.com/gogo/protobuf/proto" log "github.com/golang/glog" @@ -72,9 +70,11 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/client/cache" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" + unversioned_core "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned" - clientauth "k8s.io/kubernetes/pkg/client/unversioned/auth" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos" controllerfw "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/fields" @@ -104,7 +104,9 @@ type SchedulerServer struct { port int address net.IP enableProfiling bool - authPath string + kubeconfig string + kubeAPIQPS float32 + kubeAPIBurst int apiServerList []string etcdServerList []string allowPrivileged bool @@ -194,6 +196,8 @@ func NewSchedulerServer() *SchedulerServer { address: net.ParseIP("127.0.0.1"), failoverTimeout: time.Duration((1 << 62) - 1).Seconds(), frameworkStoreURI: "etcd://", + kubeAPIQPS: 50.0, + kubeAPIBurst: 100, runProxy: true, executorSuicideTimeout: execcfg.DefaultSuicideTimeout, @@ -250,7 +254,9 @@ func (s *SchedulerServer) addCoreFlags(fs *pflag.FlagSet) { fs.IPVar(&s.address, "address", s.address, "The IP address to serve on (set to 0.0.0.0 for all interfaces)") fs.BoolVar(&s.enableProfiling, "profiling", s.enableProfiling, "Enable profiling via web interface host:port/debug/pprof/") fs.StringSliceVar(&s.apiServerList, "api-servers", s.apiServerList, "List of Kubernetes API servers for publishing events, and reading pods and services. (ip:port), comma separated.") - fs.StringVar(&s.authPath, "auth-path", s.authPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.") + fs.StringVar(&s.kubeconfig, "kubeconfig", s.kubeconfig, "Path to kubeconfig file with authorization and master location information.") + fs.Float32Var(&s.kubeAPIQPS, "kube-api-qps", s.kubeAPIQPS, "QPS to use while talking with kubernetes apiserver") + fs.IntVar(&s.kubeAPIBurst, "kube-api-burst", s.kubeAPIBurst, "Burst to use while talking with kubernetes apiserver") fs.StringSliceVar(&s.etcdServerList, "etcd-servers", s.etcdServerList, "List of etcd servers to watch (http://ip:port), comma separated.") fs.BoolVar(&s.allowPrivileged, "allow-privileged", s.allowPrivileged, "Enable privileged containers in the kubelet (compare the same flag in the apiserver).") fs.StringVar(&s.clusterDomain, "cluster-domain", s.clusterDomain, "Domain for this cluster. If set, kubelet will configure all containers to search this domain in addition to the host's search domains") @@ -436,11 +442,11 @@ func (s *SchedulerServer) prepareExecutorInfo(hks hyperkube.Interface) (*mesos.E ci.Arguments = append(ci.Arguments, fmt.Sprintf("--conntrack-max=%d", s.conntrackMax)) ci.Arguments = append(ci.Arguments, fmt.Sprintf("--conntrack-tcp-timeout-established=%d", s.conntrackTCPTimeoutEstablished)) - if s.authPath != "" { + if s.kubeconfig != "" { //TODO(jdef) should probably support non-local files, e.g. hdfs:///some/config/file - uri, basename := s.serveFrameworkArtifact(s.authPath) + uri, basename := s.serveFrameworkArtifact(s.kubeconfig) ci.Uris = append(ci.Uris, &mesos.CommandInfo_URI{Value: proto.String(uri)}) - ci.Arguments = append(ci.Arguments, fmt.Sprintf("--auth-path=%s", basename)) + ci.Arguments = append(ci.Arguments, fmt.Sprintf("--kubeconfig=%s", basename)) } appendOptional := func(name string, value string) { if value != "" { @@ -520,34 +526,17 @@ func (s *SchedulerServer) prepareStaticPods() (data []byte, staticPodCPUs, stati return } -// TODO(jdef): hacked from kubelet/server/server.go -// TODO(k8s): replace this with clientcmd -func (s *SchedulerServer) createAPIServerClient() (*clientset.Clientset, error) { - authInfo, err := clientauth.LoadFromFile(s.authPath) - if err != nil { - log.Warningf("Could not load kubernetes auth path: %v. Continuing with defaults.", err) - } - if authInfo == nil { - // authInfo didn't load correctly - continue with defaults. - authInfo = &clientauth.Info{} - } - clientConfig, err := authInfo.MergeWithConfig(client.Config{}) +// TODO(jdef): hacked from plugin/cmd/kube-scheduler/app/server.go +func (s *SchedulerServer) createAPIServerClientConfig() (*client.Config, error) { + kubeconfig, err := clientcmd.BuildConfigFromFlags(s.apiServerList[0], s.kubeconfig) if err != nil { return nil, err } - if len(s.apiServerList) < 1 { - return nil, fmt.Errorf("no api servers specified") - } - // TODO: adapt Kube client to support LB over several servers - if len(s.apiServerList) > 1 { - log.Infof("Multiple api servers specified. Picking first one") - } - clientConfig.Host = s.apiServerList[0] - c, err := clientset.NewForConfig(&clientConfig) - if err != nil { - return nil, err - } - return c, nil + + // Override kubeconfig qps/burst settings from flags + kubeconfig.QPS = s.kubeAPIQPS + kubeconfig.Burst = s.kubeAPIBurst + return kubeconfig, nil } func (s *SchedulerServer) setDriver(driver bindings.SchedulerDriver) { @@ -691,11 +680,14 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config log.Fatal("No api servers specified.") } - client, err := s.createAPIServerClient() + clientConfig, err := s.createAPIServerClientConfig() if err != nil { - log.Fatalf("Unable to make apiserver client: %v", err) + log.Fatalf("Unable to make apiserver client config: %v", err) + } + s.client, err = clientset.NewForConfig(clientConfig) + if err != nil { + log.Fatalf("Unable to make apiserver clientset: %v", err) } - s.client = client if s.reconcileCooldown < defaultReconcileCooldown { s.reconcileCooldown = defaultReconcileCooldown @@ -719,7 +711,8 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config // mirror all nodes into the nodeStore var eiRegistry executorinfo.Registry - nodesClient, err := s.createAPIServerClient() + nodesClientConfig := *clientConfig + nodesClient, err := clientset.NewForConfig(&nodesClientConfig) if err != nil { log.Fatalf("Cannot create client to watch nodes: %v", err) } @@ -760,7 +753,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config } framework := framework.New(framework.Config{ SchedulerConfig: *sc, - Client: client, + Client: s.client, FailoverTimeout: s.failoverTimeout, ReconcileInterval: s.reconcileInterval, ReconcileCooldown: s.reconcileCooldown, @@ -791,18 +784,23 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config } // create event recorder sending events to the "" namespace of the apiserver + eventsClientConfig := *clientConfig + eventsClient, err := clientset.NewForConfig(&eventsClientConfig) + if err != nil { + log.Fatalf("Invalid API configuration: %v", err) + } broadcaster := record.NewBroadcaster() recorder := broadcaster.NewRecorder(api.EventSource{Component: api.DefaultSchedulerName}) broadcaster.StartLogging(log.Infof) - broadcaster.StartRecordingToSink(client.Events("")) + broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{eventsClient.Events("")}) // create scheduler core with all components arranged around it - lw := cache.NewListWatchFromClient(client.CoreClient, "pods", api.NamespaceAll, fields.Everything()) + lw := cache.NewListWatchFromClient(s.client.CoreClient, "pods", api.NamespaceAll, fields.Everything()) sched := components.New( sc, framework, fcfs, - client, + s.client, recorder, schedulerProcess.Terminal(), s.mux,