diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index 351289ea639..8c1e050f876 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -23,14 +23,15 @@ import ( // Labels allows you to present labels independently from their storage. type Labels interface { + // Get returns the value for the provided label. Get(label string) (value string) } -// A map of label:value. Implements Labels. +// Set is a map of label:value. It implements Labels. type Set map[string]string -// All labels listed as a human readable string. Conveniently, exactly the format -// that ParseSelector takes. +// String returns all labels listed as a human readable string. +// Conveniently, exactly the format that ParseSelector takes. func (ls Set) String() string { selector := make([]string, 0, len(ls)) for key, value := range ls { @@ -41,12 +42,12 @@ func (ls Set) String() string { return strings.Join(selector, ",") } -// Implement Labels interface. +// Get returns the value in the map for the provided label. func (ls Set) Get(label string) string { return ls[label] } -// Convenience function: convert these labels to a selector. +// AsSelector converts labels into a selectors. func (ls Set) AsSelector() Selector { return SelectorFromSet(ls) } diff --git a/pkg/labels/selector.go b/pkg/labels/selector.go index ac9c534d921..42adf008acd 100644 --- a/pkg/labels/selector.go +++ b/pkg/labels/selector.go @@ -22,12 +22,12 @@ import ( "strings" ) -// Represents a selector. +// Selector represents a label selector. type Selector interface { - // Returns true if this selector matches the given set of labels. + // Matches returns true if this selector matches the given set of labels. Matches(Labels) bool - // Prints a human readable version of this selector. + // String returns a human readable string that represents this selector. String() string } @@ -87,7 +87,7 @@ func try(selectorPiece, op string) (lhs, rhs string, ok bool) { return "", "", false } -// Given a Set, return a Selector which will match exactly that Set. +// SelectorFromSet returns a Selector which will match exactly the given Set. func SelectorFromSet(ls Set) Selector { items := make([]Selector, 0, len(ls)) for label, value := range ls { @@ -99,7 +99,8 @@ func SelectorFromSet(ls Set) Selector { return andTerm(items) } -// Takes a string repsenting a selector and returns an object suitable for matching, or an error. +// ParseSelector takes a string repsenting a selector and returns an +// object suitable for matching, or an error. func ParseSelector(selector string) (Selector, error) { parts := strings.Split(selector, ",") sort.StringSlice(parts).Sort() diff --git a/pkg/master/master.go b/pkg/master/master.go index 74c40d0081b..e12fc7f28ec 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -43,7 +43,7 @@ type Master struct { storage map[string]apiserver.RESTStorage } -// Returns a memory (not etcd) backed apiserver. +// NewMemoryServer returns a new instance of Master backed with memory (not etcd). func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master { m := &Master{ podRegistry: registry.MakeMemoryRegistry(), @@ -55,7 +55,7 @@ func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud return m } -// Returns a new apiserver. +// New returns a new instance of Master connected to the given etcdServer. func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master { etcdClient := etcd.NewClient(etcdServers) minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp) @@ -84,7 +84,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30) go podCache.Loop() - s := scheduler.MakeRandomFitScheduler(m.podRegistry, m.random) + s := scheduler.NewRandomFitScheduler(m.podRegistry, m.random) m.storage = map[string]apiserver.RESTStorage{ "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache), "replicationControllers": registry.NewControllerRegistryStorage(m.controllerRegistry, m.podRegistry), @@ -94,7 +94,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf } -// Runs master. Never returns. +// Run begins serving the Kubernetes API. It never returns. func (m *Master) Run(myAddress, apiPrefix string) error { endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) @@ -109,8 +109,9 @@ func (m *Master) Run(myAddress, apiPrefix string) error { return s.ListenAndServe() } -// Instead of calling Run, call ConstructHandler to get a handler for your own -// server. Intended for testing. Only call once. +// ConstructHandler returns an http.Handler which serves the Kubernetes API. +// Instead of calling Run, you can call this function to get a handler for your own server. +// It is intended for testing. Only call once. func (m *Master) ConstructHandler(apiPrefix string) http.Handler { endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index 14416af2ba5..c6be9a9f9dd 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -40,6 +40,7 @@ type PodCache struct { podLock sync.Mutex } +// NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry. func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache { return &PodCache{ containerInfo: info, @@ -49,7 +50,7 @@ func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period ti } } -// Implements the PodInfoGetter interface. +// GetPodInfo Implements the PodInfoGetter.GetPodInfo. // The returned value should be treated as read-only. func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { p.podLock.Lock() @@ -57,9 +58,8 @@ func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { value, ok := p.podInfo[podID] if !ok { return nil, errors.New("no cached pod info") - } else { - return value, nil } + return value, nil } func (p *PodCache) updatePodInfo(host, id string) error { @@ -73,7 +73,7 @@ func (p *PodCache) updatePodInfo(host, id string) error { return nil } -// Update information about all containers. Either called by Loop() below, or one-off. +// UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off. func (p *PodCache) UpdateAllContainers() { pods, err := p.pods.ListPods(labels.Everything()) if err != nil { @@ -88,7 +88,8 @@ func (p *PodCache) UpdateAllContainers() { } } -// Loop runs forever, it is expected to be placed in a go routine. +// Loop begins watching updates of container information. +// It runs forever, and is expected to be placed in a go routine. func (p *PodCache) Loop() { util.Forever(func() { p.UpdateAllContainers() }, p.period) } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 99db95f9b06..7f17d71d6f5 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -24,40 +24,44 @@ import ( "github.com/golang/glog" ) +// Operation is a type of operation of services or endpoints. type Operation int +// These are the available operation types. const ( SET Operation = iota ADD REMOVE ) -// Defines an operation sent on the channel. You can add or remove single services by -// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system -// to a given state for this source configuration, set Services as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source -// channel. To remove all services, set Services to empty array and Op to SET +// ServiceUpdate describes an operation of services, sent on the channel. +// You can add or remove single services by sending an array of size one and Op == ADD|REMOVE. +// For setting the state of the system to a given state for this source configuration, set Services as desired and Op to SET, +// which will reset the system state to that specified in this operation for this source channel. +// To remove all services, set Services to empty array and Op to SET type ServiceUpdate struct { Services []api.Service Op Operation } -// Defines an operation sent on the channel. You can add or remove single endpoints by -// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system -// to a given state for this source configuration, set Endpoints as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source -// channel. To remove all endpoints, set Endpoints to empty array and Op to SET +// EndpointsUpdate describes an operation of endpoints, sent on the channel. +// You can add or remove single endpoints by sending an array of size one and Op == ADD|REMOVE. +// For setting the state of the system to a given state for this source configuration, set Endpoints as desired and Op to SET, +// which will reset the system state to that specified in this operation for this source channel. +// To remove all endpoints, set Endpoints to empty array and Op to SET type EndpointsUpdate struct { Endpoints []api.Endpoints Op Operation } +// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. type ServiceConfigHandler interface { - // Sent when a configuration has been changed by one of the sources. This is the - // union of all the configuration sources. + // OnUpdate gets called when a configuration has been changed by one of the sources. + // This is the union of all the configuration sources. OnUpdate(services []api.Service) } +// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. type EndpointsConfigHandler interface { // OnUpdate gets called when endpoints configuration is changed for a given // service on any of the configuration sources. An example is when a new @@ -65,6 +69,8 @@ type EndpointsConfigHandler interface { OnUpdate(endpoints []api.Endpoints) } +// ServiceConfig tracks a set of service configurations and their endpoint configurations. +// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change. type ServiceConfig struct { // Configuration sources and their lock. configSourceLock sync.RWMutex @@ -94,6 +100,8 @@ type ServiceConfig struct { endpointsNotifyChannel chan string } +// NewServiceConfig creates a new ServiceConfig. +// It immediately runs the created ServiceConfig. func NewServiceConfig() *ServiceConfig { config := &ServiceConfig{ serviceConfigSources: make(map[string]chan ServiceUpdate), @@ -109,22 +117,26 @@ func NewServiceConfig() *ServiceConfig { return config } +// Run begins a loop to accept new service configurations and new endpoint configurations. +// It never returns. func (impl *ServiceConfig) Run() { glog.Infof("Starting the config Run loop") for { select { case source := <-impl.serviceNotifyChannel: glog.Infof("Got new service configuration from source %s", source) - impl.NotifyServiceUpdate() + impl.notifyServiceUpdate() case source := <-impl.endpointsNotifyChannel: glog.Infof("Got new endpoint configuration from source %s", source) - impl.NotifyEndpointsUpdate() + impl.notifyEndpointsUpdate() case <-time.After(1 * time.Second): } } } -func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel chan ServiceUpdate) { +// serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel. +// It never returns. +func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) { // Represents the current services configuration for this channel. serviceMap := make(map[string]api.Service) for { @@ -160,7 +172,9 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c } } -func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { +// endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel. +// It never returns. +func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { endpointMap := make(map[string]api.Endpoints) for { select { @@ -214,11 +228,11 @@ func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan Se } newChannel := make(chan ServiceUpdate) impl.serviceConfigSources[source] = newChannel - go impl.ServiceChannelListener(source, newChannel) + go impl.serviceChannelListener(source, newChannel) return newChannel } -// GetEndpointConfigurationChannel returns a channel where a configuration source +// GetEndpointsConfigurationChannel returns a channel where a configuration source // can send updates of new endpoint configurations. Multiple calls with the same // source will return the same channel. This allows change and state based sources // to use the same channel. Difference source names however will be treated as a @@ -235,11 +249,11 @@ func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan } newChannel := make(chan EndpointsUpdate) impl.endpointsConfigSources[source] = newChannel - go impl.EndpointsChannelListener(source, newChannel) + go impl.endpointsChannelListener(source, newChannel) return newChannel } -// Register ServiceConfigHandler to receive updates of changes to services. +// RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services. func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) { impl.handlerLock.Lock() defer impl.handlerLock.Unlock() @@ -255,7 +269,7 @@ func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) panic("Only up to 10 service handlers supported for now") } -// Register ServiceConfigHandler to receive updates of changes to services. +// RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services. func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) { impl.handlerLock.Lock() defer impl.handlerLock.Unlock() @@ -271,8 +285,9 @@ func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandl panic("Only up to 10 endpoint handlers supported for now") } -func (impl *ServiceConfig) NotifyServiceUpdate() { - services := make([]api.Service, 0) +// notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services. +func (impl *ServiceConfig) notifyServiceUpdate() { + services := []api.Service{} impl.configLock.RLock() for _, sourceServices := range impl.serviceConfig { for _, value := range sourceServices { @@ -291,8 +306,9 @@ func (impl *ServiceConfig) NotifyServiceUpdate() { } } -func (impl *ServiceConfig) NotifyEndpointsUpdate() { - endpoints := make([]api.Endpoints, 0) +// notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints. +func (impl *ServiceConfig) notifyEndpointsUpdate() { + endpoints := []api.Endpoints{} impl.configLock.RLock() for _, sourceEndpoints := range impl.endpointConfig { for _, value := range sourceEndpoints { diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go index 5b536f4e9ca..89488fca4f2 100644 --- a/pkg/proxy/config/etcd.go +++ b/pkg/proxy/config/etcd.go @@ -15,7 +15,7 @@ limitations under the License. */ // Watches etcd and gets the full configuration on preset intervals. -// Expects the list of exposed services to live under: +// It expects the list of exposed services to live under: // registry/services // which in etcd is exposed like so: // http:///v2/keys/registry/services @@ -30,7 +30,7 @@ limitations under the License. // '[ { "machine": , "name": }, // { "machine": , "name": } // ]', -// + package config import ( @@ -44,14 +44,17 @@ import ( "github.com/golang/glog" ) -const RegistryRoot = "registry/services" +// registryRoot is the key prefix for service configs in etcd. +const registryRoot = "registry/services" +// ConfigSourceEtcd communicates with a etcd via the client, and sends the change notification of services and endpoints to the specified channels. type ConfigSourceEtcd struct { client *etcd.Client serviceChannel chan ServiceUpdate endpointsChannel chan EndpointsUpdate } +// NewConfigSourceEtcd creates a new ConfigSourceEtcd and immediately runs the created ConfigSourceEtcd in a goroutine. func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd { config := ConfigSourceEtcd{ client: client, @@ -62,13 +65,14 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, return config } +// Run begins watching for new services and their endpoints on etcd. func (impl ConfigSourceEtcd) Run() { // Initially, just wait for the etcd to come up before doing anything more complicated. var services []api.Service var endpoints []api.Endpoints var err error for { - services, endpoints, err = impl.GetServices() + services, endpoints, err = impl.getServices() if err == nil { break } @@ -87,10 +91,10 @@ func (impl ConfigSourceEtcd) Run() { // Ok, so we got something back from etcd. Let's set up a watch for new services, and // their endpoints - go impl.WatchForChanges() + go impl.watchForChanges() for { - services, endpoints, err = impl.GetServices() + services, endpoints, err = impl.getServices() if err != nil { glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err) } else { @@ -107,12 +111,12 @@ func (impl ConfigSourceEtcd) Run() { } } -// Finds the list of services and their endpoints from etcd. +// getServices finds the list of services and their endpoints from etcd. // This operation is akin to a set a known good at regular intervals. -func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) { - response, err := impl.client.Get(RegistryRoot+"/specs", true, false) +func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) { + response, err := impl.client.Get(registryRoot+"/specs", true, false) if err != nil { - glog.Errorf("Failed to get the key %s: %v", RegistryRoot, err) + glog.Errorf("Failed to get the key %s: %v", registryRoot, err) return make([]api.Service, 0), make([]api.Endpoints, 0), err } if response.Node.Dir == true { @@ -129,7 +133,7 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro continue } retServices[i] = svc - endpoints, err := impl.GetEndpoints(svc.ID) + endpoints, err := impl.getEndpoints(svc.ID) if err != nil { glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err) } @@ -138,23 +142,23 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro } return retServices, retEndpoints, err } - return nil, nil, fmt.Errorf("did not get the root of the registry %s", RegistryRoot) + return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot) } -func (impl ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) { - key := fmt.Sprintf(RegistryRoot + "/endpoints/" + service) +// getEndpoints finds the list of endpoints of the service from etcd. +func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) { + key := fmt.Sprintf(registryRoot + "/endpoints/" + service) response, err := impl.client.Get(key, true, false) if err != nil { glog.Errorf("Failed to get the key: %s %v", key, err) return api.Endpoints{}, err } // Parse all the endpoint specifications in this value. - return ParseEndpoints(response.Node.Value) + return parseEndpoints(response.Node.Value) } -// EtcdResponseToServiceAndLocalport takes an etcd response and pulls it apart to find -// service -func EtcdResponseToService(response *etcd.Response) (*api.Service, error) { +// etcdResponseToService takes an etcd response and pulls it apart to find service. +func etcdResponseToService(response *etcd.Response) (*api.Service, error) { if response.Node == nil { return nil, fmt.Errorf("invalid response from etcd: %#v", response) } @@ -166,31 +170,31 @@ func EtcdResponseToService(response *etcd.Response) (*api.Service, error) { return &svc, err } -func ParseEndpoints(jsonString string) (api.Endpoints, error) { +func parseEndpoints(jsonString string) (api.Endpoints, error) { var e api.Endpoints err := json.Unmarshal([]byte(jsonString), &e) return e, err } -func (impl ConfigSourceEtcd) WatchForChanges() { +func (impl ConfigSourceEtcd) watchForChanges() { glog.Info("Setting up a watch for new services") watchChannel := make(chan *etcd.Response) go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil) for { watchResponse := <-watchChannel - impl.ProcessChange(watchResponse) + impl.processChange(watchResponse) } } -func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) { +func (impl ConfigSourceEtcd) processChange(response *etcd.Response) { glog.Infof("Processing a change in service configuration... %s", *response) // If it's a new service being added (signified by a localport being added) // then process it as such if strings.Contains(response.Node.Key, "/endpoints/") { - impl.ProcessEndpointResponse(response) + impl.processEndpointResponse(response) } else if response.Action == "set" { - service, err := EtcdResponseToService(response) + service, err := etcdResponseToService(response) if err != nil { glog.Errorf("Failed to parse %s Port: %s", response, err) return @@ -208,13 +212,12 @@ func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) { serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}} impl.serviceChannel <- serviceUpdate return - } else { - glog.Infof("Unknown service delete: %#v", parts) } + glog.Infof("Unknown service delete: %#v", parts) } } -func (impl ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { +func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) { glog.Infof("Processing a change in endpoint configuration... %s", *response) var endpoints api.Endpoints err := json.Unmarshal([]byte(response.Node.Value), &endpoints) diff --git a/pkg/proxy/config/etcd_test.go b/pkg/proxy/config/etcd_test.go index 7c7a832f0df..9b1dafbcccb 100644 --- a/pkg/proxy/config/etcd_test.go +++ b/pkg/proxy/config/etcd_test.go @@ -26,15 +26,15 @@ import ( const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34" const TomcatService = "tomcat" -const TomcatContainerId = "tomcat-3bd5af34" +const TomcatContainerID = "tomcat-3bd5af34" -func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { - endpoints, err := ParseEndpoints(jsonString) +func validateJSONParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { + endpoints, err := parseEndpoints(jsonString) if err == nil && expectError { - t.Errorf("ValidateJsonParsing did not get expected error when parsing %s", jsonString) + t.Errorf("validateJSONParsing did not get expected error when parsing %s", jsonString) } if err != nil && !expectError { - t.Errorf("ValidateJsonParsing got unexpected error %+v when parsing %s", err, jsonString) + t.Errorf("validateJSONParsing got unexpected error %+v when parsing %s", err, jsonString) } if !reflect.DeepEqual(expectedEndpoints, endpoints) { t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints) @@ -42,7 +42,7 @@ func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api. } func TestParseJsonEndpoints(t *testing.T) { - ValidateJsonParsing(t, "", api.Endpoints{}, true) + validateJSONParsing(t, "", api.Endpoints{}, true) endpoints := api.Endpoints{ Name: "foo", Endpoints: []string{"foo", "bar", "baz"}, @@ -51,6 +51,6 @@ func TestParseJsonEndpoints(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %#v", err) } - ValidateJsonParsing(t, string(data), endpoints, false) - // ValidateJsonParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) + validateJSONParsing(t, string(data), endpoints, false) + // validateJSONParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) } diff --git a/pkg/proxy/config/file.go b/pkg/proxy/config/file.go index 65a1b9705bf..ac6810bdec3 100644 --- a/pkg/proxy/config/file.go +++ b/pkg/proxy/config/file.go @@ -28,6 +28,7 @@ limitations under the License. // } //] //} + package config import ( @@ -41,22 +42,23 @@ import ( "github.com/golang/glog" ) -// TODO: kill this struct. -type ServiceJSON struct { - Name string - Port int - Endpoints []string -} -type ConfigFile struct { - Services []ServiceJSON +// serviceConfig is a deserialized form of the config file format which ConfigSourceFile accepts. +type serviceConfig struct { + Services []struct { + Name string `json: "name"` + Port int `json: "port"` + Endpoints []string `json: "endpoints"` + } `json: "service"` } +// ConfigSourceFile periodically reads service configurations in JSON from a file, and sends the services and endpoints defined in the file to the specified channels. type ConfigSourceFile struct { serviceChannel chan ServiceUpdate endpointsChannel chan EndpointsUpdate filename string } +// NewConfigSourceFile creates a new ConfigSourceFile and let it immediately runs the created ConfigSourceFile in a goroutine. func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile { config := ConfigSourceFile{ filename: filename, @@ -67,6 +69,7 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end return config } +// Run begins watching the config file. func (impl ConfigSourceFile) Run() { glog.Infof("Watching file %s", impl.filename) var lastData []byte @@ -85,7 +88,7 @@ func (impl ConfigSourceFile) Run() { } lastData = data - config := new(ConfigFile) + config := &serviceConfig{} if err = json.Unmarshal(data, config); err != nil { glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err) continue diff --git a/pkg/proxy/doc.go b/pkg/proxy/doc.go index e29caf01efe..7e209b66503 100644 --- a/pkg/proxy/doc.go +++ b/pkg/proxy/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package proxy implements the layer-3 network proxy +// Package proxy implements the layer-3 network proxy. package proxy diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/loadbalancer.go index 6481ee30bfd..6771d8bfcc6 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/loadbalancer.go @@ -22,6 +22,8 @@ import ( "net" ) +// LoadBalancer represents a load balancer that decides where to route +// the incoming services for a particular service to. type LoadBalancer interface { // LoadBalance takes an incoming request and figures out where to route it to. // Determination is based on destination service (for example, 'mysql') as diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index f83f3f9ba4e..a7822e56a29 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -33,11 +33,12 @@ type Proxier struct { serviceMap map[string]int } +// NewProxier returns a newly created and correctly initialized instance of Proxier. func NewProxier(loadBalancer LoadBalancer) *Proxier { return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} } -func CopyBytes(in, out *net.TCPConn) { +func copyBytes(in, out *net.TCPConn) { glog.Infof("Copying from %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) _, err := io.Copy(in, out) @@ -49,14 +50,17 @@ func CopyBytes(in, out *net.TCPConn) { out.CloseWrite() } -// Create a bidirectional byte shuffler. Copies bytes to/from each connection. -func ProxyConnection(in, out *net.TCPConn) { +// proxyConnection creates a bidirectional byte shuffler. +// It copies bytes to/from each connection. +func proxyConnection(in, out *net.TCPConn) { glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - go CopyBytes(in, out) - go CopyBytes(out, in) + go copyBytes(in, out) + go copyBytes(out, in) } +// AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints. +// It never returns. func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { for { inConn, err := listener.Accept() @@ -83,12 +87,12 @@ func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { inConn.Close() continue } - ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) + proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) } } -// AddService starts listening for a new service on a given port. -func (proxier Proxier) AddService(service string, port int) error { +// addService starts listening for a new service on a given port. +func (proxier Proxier) addService(service string, port int) error { // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { @@ -117,18 +121,21 @@ func (proxier Proxier) addServiceCommon(service string, l net.Listener) { go proxier.AcceptHandler(service, l) } +// OnUpdate recieves update notices for the updated services and start listening newly added services. +// It implements "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config".ServiceConfigHandler.OnUpdate. func (proxier Proxier) OnUpdate(services []api.Service) { glog.Infof("Received update notice: %+v", services) for _, service := range services { port, exists := proxier.serviceMap[service.ID] - if !exists || port != service.Port { - glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) - err := proxier.AddService(service.ID, service.Port) - if err == nil { - proxier.serviceMap[service.ID] = service.Port - } else { - glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port) - } + if exists && port == service.Port { + continue } + glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) + err := proxier.addService(service.ID, service.Port) + if err != nil { + glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port) + continue + } + proxier.serviceMap[service.ID] = service.Port } } diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index a55d10d5ccc..a5acebba7fc 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -29,22 +29,25 @@ import ( "github.com/golang/glog" ) +// LoadBalancerRR is a round-robin load balancer. It implements LoadBalancer. type LoadBalancerRR struct { lock sync.RWMutex endpointsMap map[string][]string rrIndex map[string]int } +// NewLoadBalancerRR returns a newly created and correctly initialized instance of LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)} } +// LoadBalance selects an endpoint of the service by round-robin algorithm. func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) { impl.lock.RLock() endpoints, exists := impl.endpointsMap[service] index := impl.rrIndex[service] impl.lock.RUnlock() - if exists == false { + if !exists { return "", errors.New("no service entry for:" + service) } if len(endpoints) == 0 { @@ -55,7 +58,7 @@ func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string return endpoint, nil } -func (impl LoadBalancerRR) IsValid(spec string) bool { +func (impl LoadBalancerRR) isValid(spec string) bool { _, port, err := net.SplitHostPort(spec) if err != nil { return false @@ -67,16 +70,19 @@ func (impl LoadBalancerRR) IsValid(spec string) bool { return value > 0 } -func (impl LoadBalancerRR) FilterValidEndpoints(endpoints []string) []string { +func (impl LoadBalancerRR) filterValidEndpoints(endpoints []string) []string { var result []string for _, spec := range endpoints { - if impl.IsValid(spec) { + if impl.isValid(spec) { result = append(result, spec) } } return result } +// OnUpdate updates the registered endpoints with the new +// endpoint information, removes the registered endpoints +// no longer present in the provided endpoints. func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { tmp := make(map[string]bool) impl.lock.Lock() @@ -84,7 +90,7 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { // First update / add all new endpoints for services. for _, value := range endpoints { existingEndpoints, exists := impl.endpointsMap[value.Name] - validEndpoints := impl.FilterValidEndpoints(value.Endpoints) + validEndpoints := impl.filterValidEndpoints(value.Endpoints) if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) { glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints) impl.endpointsMap[value.Name] = validEndpoints diff --git a/pkg/proxy/roundrobbin_test.go b/pkg/proxy/roundrobbin_test.go index b4af90a9819..1112141de80 100644 --- a/pkg/proxy/roundrobbin_test.go +++ b/pkg/proxy/roundrobbin_test.go @@ -24,16 +24,16 @@ import ( func TestLoadBalanceValidateWorks(t *testing.T) { loadBalancer := NewLoadBalancerRR() - if loadBalancer.IsValid("") { + if loadBalancer.isValid("") { t.Errorf("Didn't fail for empty string") } - if loadBalancer.IsValid("foobar") { + if loadBalancer.isValid("foobar") { t.Errorf("Didn't fail with no port") } - if loadBalancer.IsValid("foobar:-1") { + if loadBalancer.isValid("foobar:-1") { t.Errorf("Didn't fail with a negative port") } - if !loadBalancer.IsValid("foobar:8080") { + if !loadBalancer.isValid("foobar:8080") { t.Errorf("Failed a valid config.") } } @@ -41,7 +41,7 @@ func TestLoadBalanceValidateWorks(t *testing.T) { func TestLoadBalanceFilterWorks(t *testing.T) { loadBalancer := NewLoadBalancerRR() endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"} - filtered := loadBalancer.FilterValidEndpoints(endpoints) + filtered := loadBalancer.filterValidEndpoints(endpoints) if len(filtered) != 3 { t.Errorf("Failed to filter to the correct size") @@ -59,7 +59,7 @@ func TestLoadBalanceFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoints := make([]api.Endpoints, 0) + var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) endpoint, err := loadBalancer.LoadBalance("foo", nil) if err == nil { diff --git a/pkg/scheduler/randomfit.go b/pkg/scheduler/randomfit.go index 20193fde255..b198f9c1c2b 100644 --- a/pkg/scheduler/randomfit.go +++ b/pkg/scheduler/randomfit.go @@ -32,7 +32,7 @@ type RandomFitScheduler struct { randomLock sync.Mutex } -func MakeRandomFitScheduler(podLister PodLister, random *rand.Rand) Scheduler { +func NewRandomFitScheduler(podLister PodLister, random *rand.Rand) Scheduler { return &RandomFitScheduler{ podLister: podLister, random: random, diff --git a/pkg/scheduler/randomfit_test.go b/pkg/scheduler/randomfit_test.go index 1cdf505b9e1..642179249e8 100644 --- a/pkg/scheduler/randomfit_test.go +++ b/pkg/scheduler/randomfit_test.go @@ -28,7 +28,7 @@ func TestRandomFitSchedulerNothingScheduled(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeRandomFitScheduler(&fakeRegistry, r), + scheduler: NewRandomFitScheduler(&fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectSchedule(api.Pod{}, "m3") @@ -41,7 +41,7 @@ func TestRandomFitSchedulerFirstScheduled(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeRandomFitScheduler(fakeRegistry, r), + scheduler: NewRandomFitScheduler(fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectSchedule(makePod("", 8080), "m3") @@ -56,7 +56,7 @@ func TestRandomFitSchedulerFirstScheduledComplicated(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeRandomFitScheduler(fakeRegistry, r), + scheduler: NewRandomFitScheduler(fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectSchedule(makePod("", 8080, 8081), "m3") @@ -71,7 +71,7 @@ func TestRandomFitSchedulerFirstScheduledImpossible(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeRandomFitScheduler(fakeRegistry, r), + scheduler: NewRandomFitScheduler(fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectFailure(makePod("", 8080, 8081))