diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 65cb21487a3..c49b9cc3c95 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -33,19 +33,16 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util/exec" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/iptables" - "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" "github.com/spf13/pflag" ) // ProxyServer contains configures and runs a Kubernetes proxy server type ProxyServer struct { - EtcdServerList util.StringList - EtcdConfigFile string - BindAddress util.IP - ClientConfig client.Config - HealthzPort int - OOMScoreAdj int + BindAddress util.IP + ClientConfig client.Config + HealthzPort int + OOMScoreAdj int } // NewProxyServer creates a new ProxyServer object with default parameters @@ -59,8 +56,6 @@ func NewProxyServer() *ProxyServer { // AddFlags adds flags for a specific ProxyServer to the specified FlagSet func (s *ProxyServer) AddFlags(fs *pflag.FlagSet) { - fs.StringVar(&s.EtcdConfigFile, "etcd_config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd_servers") - fs.Var(&s.EtcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional). Mutually exclusive with -etcd_config") fs.Var(&s.BindAddress, "bind_address", "The IP address for the proxy server to serve on (set to 0.0.0.0 for all interfaces)") client.BindClientConfigFlags(fs, &s.ClientConfig) fs.IntVar(&s.HealthzPort, "healthz_port", s.HealthzPort, "The port to bind the health check server. Use 0 to disable.") @@ -109,33 +104,6 @@ func (s *ProxyServer) Run(_ []string) error { serviceConfig.Channel("api"), endpointsConfig.Channel("api"), ) - } else { - var etcdClient *etcd.Client - - // Set up etcd client - if len(s.EtcdServerList) > 0 { - // Set up logger for etcd client - etcd.SetLogger(util.NewLogger("etcd ")) - etcdClient = etcd.NewClient(s.EtcdServerList) - } else if s.EtcdConfigFile != "" { - // Set up logger for etcd client - etcd.SetLogger(util.NewLogger("etcd ")) - var err error - etcdClient, err = etcd.NewClientFromFile(s.EtcdConfigFile) - - if err != nil { - glog.Fatalf("Error with etcd config file: %v", err) - } - } - - // Create a configuration source that handles configuration from etcd. - if etcdClient != nil { - glog.Infof("Using etcd servers %v", etcdClient.GetCluster()) - - config.NewConfigSourceEtcd(etcdClient, - serviceConfig.Channel("etcd"), - endpointsConfig.Channel("etcd")) - } } if s.HealthzPort > 0 { diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4946a9c96e3..4b6e8d3e4a2 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -60,8 +60,6 @@ type KubeletServer struct { HostnameOverride string PodInfraContainerImage string DockerEndpoint string - EtcdServerList util.StringList - EtcdConfigFile string RootDirectory string AllowPrivileged bool RegistryPullQPS float64 @@ -115,13 +113,11 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.HostnameOverride, "hostname_override", s.HostnameOverride, "If non-empty, will use this string as identification instead of the actual hostname.") fs.StringVar(&s.PodInfraContainerImage, "pod_infra_container_image", s.PodInfraContainerImage, "The image whose network/ipc namespaces containers in each pod will use.") fs.StringVar(&s.DockerEndpoint, "docker_endpoint", s.DockerEndpoint, "If non-empty, use this for the docker endpoint to communicate with") - fs.Var(&s.EtcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd_config") - fs.StringVar(&s.EtcdConfigFile, "etcd_config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd_servers") fs.StringVar(&s.RootDirectory, "root_dir", s.RootDirectory, "Directory path for managing kubelet files (volume mounts,etc).") fs.BoolVar(&s.AllowPrivileged, "allow_privileged", s.AllowPrivileged, "If true, allow containers to request privileged mode. [default=false]") fs.Float64Var(&s.RegistryPullQPS, "registry_qps", s.RegistryPullQPS, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") fs.IntVar(&s.RegistryBurst, "registry_burst", s.RegistryBurst, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") - fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers, --api_servers, and --enable-server") + fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api_servers, and --enable-server") fs.BoolVar(&s.EnableDebuggingHandlers, "enable_debugging_handlers", s.EnableDebuggingHandlers, "Enables server endpoints for log collection and local running of containers and commands") fs.DurationVar(&s.MinimumGCAge, "minimum_container_ttl_duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") fs.IntVar(&s.MaxContainerCount, "maximum_dead_containers_per_container", s.MaxContainerCount, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.") @@ -141,20 +137,6 @@ func (s *KubeletServer) Run(_ []string) error { util.ReallyCrash = s.ReallyCrashForTesting rand.Seed(time.Now().UTC().UnixNano()) - // Cluster creation scripts support both kubernetes versions that 1) - // support kublet watching apiserver for pods, and 2) ones that don't. So - // they can set both --etcd_servers and --api_servers. The current code - // will ignore the --etcd_servers flag, while older kubelet code will use - // the --etcd_servers flag for pods, and use --api_servers for event - // publising. - // - // TODO(erictune): convert all cloud provider scripts and Google Container Engine to - // use only --api_servers, then delete --etcd_servers flag and the resulting dead code. - if len(s.EtcdServerList) > 0 && len(s.APIServerList) > 0 { - glog.Infof("Both --etcd_servers and --api_servers are set. Not using etcd source.") - s.EtcdServerList = util.StringList{} - } - if err := util.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil { glog.Info(err) } @@ -192,7 +174,6 @@ func (s *KubeletServer) Run(_ []string) error { EnableDebuggingHandlers: s.EnableDebuggingHandlers, DockerClient: dockertools.ConnectToDockerOrDie(s.DockerEndpoint), KubeClient: client, - EtcdClient: kubelet.EtcdClientOrDie(s.EtcdServerList, s.EtcdConfigFile), MasterServiceNamespace: s.MasterServiceNamespace, VolumePlugins: ProbeVolumePlugins(), StreamingConnectionIdleTimeout: s.StreamingConnectionIdleTimeout, @@ -208,9 +189,6 @@ func (s *KubeletServer) Run(_ []string) error { func (s *KubeletServer) setupRunOnce() { if s.RunOnce { // Don't use remote (etcd or apiserver) sources - if len(s.EtcdServerList) > 0 { - glog.Fatalf("invalid option: --runonce and --etcd_servers are mutually exclusive") - } if len(s.APIServerList) > 0 { glog.Fatalf("invalid option: --runonce and --api_servers are mutually exclusive") } diff --git a/pkg/kubelet/util.go b/pkg/kubelet/util.go index 9c453f27d15..e9afc881a03 100644 --- a/pkg/kubelet/util.go +++ b/pkg/kubelet/util.go @@ -20,26 +20,9 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) -// TODO: move this into a pkg/tools/etcd_tools -func EtcdClientOrDie(etcdServerList util.StringList, etcdConfigFile string) tools.EtcdClient { - if len(etcdServerList) > 0 { - return etcd.NewClient(etcdServerList) - } else if etcdConfigFile != "" { - etcdClient, err := etcd.NewClientFromFile(etcdConfigFile) - if err != nil { - glog.Fatalf("Error with etcd config file: %v", err) - } - return etcdClient - } - return nil -} - // TODO: move this into pkg/capabilities func SetupCapabilities(allowPrivileged bool) { capabilities.Initialize(capabilities.Capabilities{ @@ -49,7 +32,6 @@ func SetupCapabilities(allowPrivileged bool) { // TODO: Split this up? func SetupLogging() { - etcd.SetLogger(util.NewLogger("etcd ")) // Log the events locally too. record.StartLogging(glog.Infof) } diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go deleted file mode 100644 index 549f82f0774..00000000000 --- a/pkg/proxy/config/etcd.go +++ /dev/null @@ -1,264 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Watches etcd and gets the full configuration on preset intervals. -// It expects the list of exposed services to live under: -// registry/services -// which in etcd is exposed like so: -// http:///v2/keys/registry/services -// -// The port that proxy needs to listen in for each service is a value in: -// registry/services/ -// -// The endpoints for each of the services found is a json string -// representing that service at: -// /registry/services//endpoint -// and the format is: -// '[ { "machine": , "name": }, -// { "machine": , "name": } -// ]', - -package config - -import ( - "fmt" - "path" - "strings" - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/coreos/go-etcd/etcd" - "github.com/golang/glog" -) - -// registryRoot is the key prefix for service configs in etcd. -const registryRoot = "registry/services" - -// ConfigSourceEtcd communicates with a etcd via the client, and sends the change notification of services and endpoints to the specified channels. -type ConfigSourceEtcd struct { - client *etcd.Client - serviceChannel chan ServiceUpdate - endpointsChannel chan EndpointsUpdate - interval time.Duration -} - -// NewConfigSourceEtcd creates a new ConfigSourceEtcd and immediately runs the created ConfigSourceEtcd in a goroutine. -func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd { - config := ConfigSourceEtcd{ - client: client, - serviceChannel: serviceChannel, - endpointsChannel: endpointsChannel, - interval: 2 * time.Second, - } - go config.Run() - return config -} - -// Run begins watching for new services and their endpoints on etcd. -func (s ConfigSourceEtcd) Run() { - // Initially, just wait for the etcd to come up before doing anything more complicated. - var services []api.Service - var endpoints []api.Endpoints - var err error - for { - services, endpoints, err = s.GetServices() - if err == nil { - break - } - glog.V(1).Infof("Failed to get any services: %v", err) - time.Sleep(s.interval) - } - - if len(services) > 0 { - serviceUpdate := ServiceUpdate{Op: SET, Services: services} - s.serviceChannel <- serviceUpdate - } - if len(endpoints) > 0 { - endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints} - s.endpointsChannel <- endpointsUpdate - } - - // Ok, so we got something back from etcd. Let's set up a watch for new services, and - // their endpoints - go util.Forever(s.WatchForChanges, 1*time.Second) - - for { - services, endpoints, err = s.GetServices() - if err != nil { - glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err) - } else { - if len(services) > 0 { - serviceUpdate := ServiceUpdate{Op: SET, Services: services} - s.serviceChannel <- serviceUpdate - } - if len(endpoints) > 0 { - endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: endpoints} - s.endpointsChannel <- endpointsUpdate - } - } - time.Sleep(30 * time.Second) - } -} - -// decodeServices recurses from the root of the service storage directory into each namespace to get each service and endpoint object -func (s ConfigSourceEtcd) decodeServices(node *etcd.Node, retServices []api.Service, retEndpoints []api.Endpoints) ([]api.Service, []api.Endpoints, error) { - // TODO this needs to go against API server desperately, so much redundant error prone code here - // we hit a namespace boundary, recurse until we find actual nodes - if node.Dir == true { - for _, n := range node.Nodes { - var err error // Don't shadow the ret* variables. - retServices, retEndpoints, err = s.decodeServices(n, retServices, retEndpoints) - if err != nil { - return retServices, retEndpoints, err - } - } - return retServices, retEndpoints, nil - } - - // we have an actual service node - var svc api.Service - err := latest.Codec.DecodeInto([]byte(node.Value), &svc) - if err != nil { - glog.Errorf("Failed to load Service: %s (%v)", node.Value, err) - } else { - // so we got a service we can handle, and now get endpoints - retServices = append(retServices, svc) - // get the endpoints - endpoints, err := s.GetEndpoints(svc.Namespace, svc.Name) - if err != nil { - if tools.IsEtcdNotFound(err) { - glog.V(4).Infof("Unable to get endpoints for %s %s : %v", svc.Namespace, svc.Name, err) - } - glog.Errorf("Couldn't get endpoints for %s %s : %v skipping", svc.Namespace, svc.Name, err) - endpoints = api.Endpoints{} - } else { - glog.V(3).Infof("Got service: %s %s on localport %d mapping to: %s", svc.Namespace, svc.Name, svc.Spec.Port, endpoints) - } - retEndpoints = append(retEndpoints, endpoints) - } - return retServices, retEndpoints, nil -} - -// GetServices finds the list of services and their endpoints from etcd. -// This operation is akin to a set a known good at regular intervals. -func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) { - // this is a recursive query now that services are namespaced under "/registry/services/specs//" - response, err := s.client.Get(registryRoot+"/specs", false, true) - if err != nil { - if tools.IsEtcdNotFound(err) { - glog.V(4).Infof("Failed to get the key %s: %v", registryRoot, err) - } else { - glog.Errorf("Failed to contact etcd for key %s: %v", registryRoot, err) - } - return []api.Service{}, []api.Endpoints{}, err - } - // this code needs to go through the API server in the future, this is one big hack - if response.Node.Dir == true { - retServices := []api.Service{} - retEndpoints := []api.Endpoints{} - return s.decodeServices(response.Node, retServices, retEndpoints) - } - return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot) -} - -// GetEndpoints finds the list of endpoints of the service from etcd. -func (s ConfigSourceEtcd) GetEndpoints(namespace, service string) (api.Endpoints, error) { - key := path.Join(registryRoot, "endpoints", namespace, service) - response, err := s.client.Get(key, true, false) - if err != nil { - glog.Errorf("Failed to get the key %q: %v", key, err) - return api.Endpoints{}, err - } - // Parse all the endpoint specifications in this value. - var e api.Endpoints - err = latest.Codec.DecodeInto([]byte(response.Node.Value), &e) - return e, err -} - -// etcdResponseToService takes an etcd response and pulls it apart to find service. -func etcdResponseToService(response *etcd.Response) (*api.Service, error) { - if response.Node == nil { - return nil, fmt.Errorf("invalid response from etcd: %#v", response) - } - var svc api.Service - err := latest.Codec.DecodeInto([]byte(response.Node.Value), &svc) - if err != nil { - return nil, err - } - return &svc, err -} - -func (s ConfigSourceEtcd) WatchForChanges() { - glog.V(4).Info("Setting up a watch for new services") - watchChannel := make(chan *etcd.Response) - go s.client.Watch("/registry/services/", 0, true, watchChannel, nil) - for { - watchResponse, ok := <-watchChannel - if !ok { - break - } - // only listen for non directory changes - if watchResponse.Node.Dir == false { - s.ProcessChange(watchResponse) - } - } -} - -func (s ConfigSourceEtcd) ProcessChange(response *etcd.Response) { - glog.V(4).Infof("Processing a change in service configuration... %s", *response) - - // If it's a new service being added (signified by a localport being added) - // then process it as such - if strings.Contains(response.Node.Key, "/endpoints/") { - s.ProcessEndpointResponse(response) - } else if response.Action == "set" { - service, err := etcdResponseToService(response) - if err != nil { - glog.Errorf("Failed to parse %#v Port: %s", response, err) - return - } - - glog.V(4).Infof("New service added/updated: %#v", service) - serviceUpdate := ServiceUpdate{Op: ADD, Services: []api.Service{*service}} - s.serviceChannel <- serviceUpdate - return - } - if response.Action == "delete" { - parts := strings.Split(response.Node.Key[1:], "/") - if len(parts) == 4 { - glog.V(4).Infof("Deleting service: %s", parts[3]) - serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{ObjectMeta: api.ObjectMeta{Name: parts[3]}}}} - s.serviceChannel <- serviceUpdate - return - } - glog.Warningf("Unknown service delete: %#v", parts) - } -} - -func (s ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { - glog.V(4).Infof("Processing a change in endpoint configuration... %s", *response) - var endpoints api.Endpoints - err := latest.Codec.DecodeInto([]byte(response.Node.Value), &endpoints) - if err != nil { - glog.Errorf("Failed to parse service out of etcd key: %v : %v", response.Node.Value, err) - return - } - endpointsUpdate := EndpointsUpdate{Op: ADD, Endpoints: []api.Endpoints{endpoints}} - s.endpointsChannel <- endpointsUpdate -}