From f4d989ae929b3856ba6a911135638de7b28e42de Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Thu, 10 Jul 2014 22:07:05 -0700 Subject: [PATCH 01/10] labels: Fixing linting errors. --- pkg/labels/labels.go | 10 +++++----- pkg/labels/selector.go | 11 ++++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index 351289ea639..63c3992cebd 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -26,11 +26,11 @@ type Labels interface { Get(label string) (value string) } -// A map of label:value. Implements Labels. +// Set is a map of label:value. Implements Labels. type Set map[string]string -// All labels listed as a human readable string. Conveniently, exactly the format -// that ParseSelector takes. +// String returns all labels listed as a human readable string. +// Conveniently, exactly the format that ParseSelector takes. func (ls Set) String() string { selector := make([]string, 0, len(ls)) for key, value := range ls { @@ -41,12 +41,12 @@ func (ls Set) String() string { return strings.Join(selector, ",") } -// Implement Labels interface. +// Get returns the value for the provided label. func (ls Set) Get(label string) string { return ls[label] } -// Convenience function: convert these labels to a selector. +// AsSelector converts labels into a selectors. func (ls Set) AsSelector() Selector { return SelectorFromSet(ls) } diff --git a/pkg/labels/selector.go b/pkg/labels/selector.go index 668699ee8ab..215ae479561 100644 --- a/pkg/labels/selector.go +++ b/pkg/labels/selector.go @@ -22,12 +22,12 @@ import ( "strings" ) -// Represents a selector. +// Selector represents a label selector. type Selector interface { - // Returns true if this selector matches the given set of labels. + // Matches returns true if this selector matches the given set of labels. Matches(Labels) bool - // Prints a human readable version of this selector. + // String returns a human readable string that represents this selector. String() string } @@ -87,7 +87,7 @@ func try(selectorPiece, op string) (lhs, rhs string, ok bool) { return "", "", false } -// Given a Set, return a Selector which will match exactly that Set. +// SelectorFromSet converts a Set into a Selector. func SelectorFromSet(ls Set) Selector { var items []Selector for label, value := range ls { @@ -99,7 +99,8 @@ func SelectorFromSet(ls Set) Selector { return andTerm(items) } -// Takes a string repsenting a selector and returns an object suitable for matching, or an error. +// ParseSelector takes a string repsenting a selector and returns an +// object suitable for matching, or an error. func ParseSelector(selector string) (Selector, error) { parts := strings.Split(selector, ",") sort.StringSlice(parts).Sort() From e4d966744d462d0b0d47dcf5867aa339fc247ae5 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Thu, 10 Jul 2014 22:36:06 -0700 Subject: [PATCH 02/10] proxy: fixing linting errors. --- pkg/proxy/doc.go | 2 +- pkg/proxy/loadbalancer.go | 2 ++ pkg/proxy/proxier.go | 6 +++++- pkg/proxy/roundrobbin.go | 15 +++++++++++++-- pkg/proxy/roundrobbin_test.go | 2 +- 5 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/proxy/doc.go b/pkg/proxy/doc.go index e29caf01efe..7e209b66503 100644 --- a/pkg/proxy/doc.go +++ b/pkg/proxy/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package proxy implements the layer-3 network proxy +// Package proxy implements the layer-3 network proxy. package proxy diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/loadbalancer.go index 6481ee30bfd..6771d8bfcc6 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/loadbalancer.go @@ -22,6 +22,8 @@ import ( "net" ) +// LoadBalancer represents a load balancer that decides where to route +// the incoming services for a particular service to. type LoadBalancer interface { // LoadBalance takes an incoming request and figures out where to route it to. // Determination is based on destination service (for example, 'mysql') as diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 6c1a045a769..5e781456d5e 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -33,10 +33,12 @@ type Proxier struct { serviceMap map[string]int } +// NewProxier returns a new Proxier. func NewProxier(loadBalancer LoadBalancer) *Proxier { return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} } +// CopyBytes copies bytes from in to out until EOF. func CopyBytes(in, out *net.TCPConn) { glog.Infof("Copying from %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) @@ -49,7 +51,8 @@ func CopyBytes(in, out *net.TCPConn) { out.CloseWrite() } -// Create a bidirectional byte shuffler. Copies bytes to/from each connection. +// ProxyConnection creates a bidirectional byte shuffler. +// Copies bytes to/from each connection. func ProxyConnection(in, out *net.TCPConn) { glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) @@ -117,6 +120,7 @@ func (proxier Proxier) addServiceCommon(service string, l net.Listener) { go proxier.AcceptHandler(service, l) } +// OnUpdate handles update notices for the updated services. func (proxier Proxier) OnUpdate(services []api.Service) { glog.Infof("Received update notice: %+v", services) for _, service := range services { diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index 9d9f2413540..a55d812ae6d 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -30,22 +30,28 @@ import ( "github.com/golang/glog" ) +// LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { - lock sync.RWMutex endpointsMap map[string][]string rrIndex map[string]int + + lock sync.RWMutex } +// NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)} } +// LoadBalance registers srcAddr for the provided service. +// It returns error if no entry is available for the service +// or no previous endpoints are registered. func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) { impl.lock.RLock() endpoints, exists := impl.endpointsMap[service] index := impl.rrIndex[service] impl.lock.RUnlock() - if exists == false { + if !exists { return "", errors.New("no service entry for:" + service) } if len(endpoints) == 0 { @@ -56,6 +62,7 @@ func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string return endpoint, nil } +// IsValid returns true if spec is valid. func (impl LoadBalancerRR) IsValid(spec string) bool { index := strings.Index(spec, ":") if index == -1 { @@ -68,6 +75,7 @@ func (impl LoadBalancerRR) IsValid(spec string) bool { return value > 0 } +// FilterValidEndpoints filters out invalid endpoints. func (impl LoadBalancerRR) FilterValidEndpoints(endpoints []string) []string { var result []string for _, spec := range endpoints { @@ -78,6 +86,9 @@ func (impl LoadBalancerRR) FilterValidEndpoints(endpoints []string) []string { return result } +// OnUpdate updates the registered endpoints with the new +// endpoint information, removes the registered endpoints +// no longer present in the provided endpoints. func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { tmp := make(map[string]bool) impl.lock.Lock() diff --git a/pkg/proxy/roundrobbin_test.go b/pkg/proxy/roundrobbin_test.go index b4af90a9819..216cdbd2de0 100644 --- a/pkg/proxy/roundrobbin_test.go +++ b/pkg/proxy/roundrobbin_test.go @@ -59,7 +59,7 @@ func TestLoadBalanceFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoints := make([]api.Endpoints, 0) + var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) endpoint, err := loadBalancer.LoadBalance("foo", nil) if err == nil { From 6a2703627be0a5e9f8f1c7005cd384561b72644c Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Thu, 10 Jul 2014 22:56:14 -0700 Subject: [PATCH 03/10] scheduler: use New rather than Make for construction helpers. --- pkg/master/master.go | 2 +- pkg/scheduler/firstfit.go | 2 +- pkg/scheduler/firstfit_test.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index 5b17457fcb3..b3328c9d59a 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -84,7 +84,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf m.random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) podCache := NewPodCache(podInfoGetter, m.podRegistry, time.Second*30) go podCache.Loop() - s := scheduler.MakeFirstFitScheduler(m.podRegistry, m.random) + s := scheduler.NewFirstFitScheduler(m.podRegistry, m.random) m.storage = map[string]apiserver.RESTStorage{ "pods": registry.MakePodRegistryStorage(m.podRegistry, podInfoGetter, s, m.minionRegistry, cloud, podCache), "replicationControllers": registry.MakeControllerRegistryStorage(m.controllerRegistry, m.podRegistry), diff --git a/pkg/scheduler/firstfit.go b/pkg/scheduler/firstfit.go index e98c9a96932..120163fcfb7 100644 --- a/pkg/scheduler/firstfit.go +++ b/pkg/scheduler/firstfit.go @@ -30,7 +30,7 @@ type FirstFitScheduler struct { random *rand.Rand } -func MakeFirstFitScheduler(podLister PodLister, random *rand.Rand) Scheduler { +func NewFirstFitScheduler(podLister PodLister, random *rand.Rand) Scheduler { return &FirstFitScheduler{ podLister: podLister, random: random, diff --git a/pkg/scheduler/firstfit_test.go b/pkg/scheduler/firstfit_test.go index 57e8444b0e1..1226d2547d1 100644 --- a/pkg/scheduler/firstfit_test.go +++ b/pkg/scheduler/firstfit_test.go @@ -28,7 +28,7 @@ func TestFirstFitSchedulerNothingScheduled(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeFirstFitScheduler(&fakeRegistry, r), + scheduler: NewFirstFitScheduler(&fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectSchedule(api.Pod{}, "m3") @@ -41,7 +41,7 @@ func TestFirstFitSchedulerFirstScheduled(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeFirstFitScheduler(fakeRegistry, r), + scheduler: NewFirstFitScheduler(fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectSchedule(makePod("", 8080), "m3") @@ -56,7 +56,7 @@ func TestFirstFitSchedulerFirstScheduledComplicated(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeFirstFitScheduler(fakeRegistry, r), + scheduler: NewFirstFitScheduler(fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectSchedule(makePod("", 8080, 8081), "m3") @@ -71,7 +71,7 @@ func TestFirstFitSchedulerFirstScheduledImpossible(t *testing.T) { r := rand.New(rand.NewSource(0)) st := schedulerTester{ t: t, - scheduler: MakeFirstFitScheduler(fakeRegistry, r), + scheduler: NewFirstFitScheduler(fakeRegistry, r), minionLister: FakeMinionLister{"m1", "m2", "m3"}, } st.expectFailure(makePod("", 8080, 8081)) From 1816a63d55af2938f823ba844331638bcd7c3098 Mon Sep 17 00:00:00 2001 From: Burcu Dogan Date: Fri, 11 Jul 2014 00:23:43 -0700 Subject: [PATCH 04/10] proxy: lock should be above the fields it protects. --- pkg/proxy/roundrobbin.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index a55d812ae6d..a5474843737 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -32,10 +32,9 @@ import ( // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { + lock sync.RWMutex endpointsMap map[string][]string rrIndex map[string]int - - lock sync.RWMutex } // NewLoadBalancerRR returns a new LoadBalancerRR. From 85fa11da9389686840e95445173d4c4f1b52b0e5 Mon Sep 17 00:00:00 2001 From: Yuki Yugui Sonoda Date: Fri, 11 Jul 2014 19:10:15 +0900 Subject: [PATCH 05/10] Fixes golint errors in pkg/labels --- pkg/labels/labels.go | 3 ++- pkg/labels/selector.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index 63c3992cebd..5c980d4dea0 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -23,10 +23,11 @@ import ( // Labels allows you to present labels independently from their storage. type Labels interface { + // Get returns the value identified by the label. Get(label string) (value string) } -// Set is a map of label:value. Implements Labels. +// Set is a map of label:value. It implements Labels. type Set map[string]string // String returns all labels listed as a human readable string. diff --git a/pkg/labels/selector.go b/pkg/labels/selector.go index 215ae479561..f22adcf3d8d 100644 --- a/pkg/labels/selector.go +++ b/pkg/labels/selector.go @@ -87,7 +87,7 @@ func try(selectorPiece, op string) (lhs, rhs string, ok bool) { return "", "", false } -// SelectorFromSet converts a Set into a Selector. +// SelectorFromSet returns a Selector which will match exactly the given Set. func SelectorFromSet(ls Set) Selector { var items []Selector for label, value := range ls { From 88284171f2730664c4829966f61415805d328523 Mon Sep 17 00:00:00 2001 From: Yuki Yugui Sonoda Date: Fri, 11 Jul 2014 19:10:24 +0900 Subject: [PATCH 06/10] Fixes golint errors in pkg/master --- pkg/master/master.go | 11 ++++++----- pkg/master/pod_cache.go | 8 ++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index b3328c9d59a..cda7ba4f11d 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -43,7 +43,7 @@ type Master struct { storage map[string]apiserver.RESTStorage } -// Returns a memory (not etcd) backed apiserver. +// NewMemoryServer returns a memory (not etcd) backed apiserver. func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master { m := &Master{ podRegistry: registry.MakeMemoryRegistry(), @@ -55,7 +55,7 @@ func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud return m } -// Returns a new apiserver. +// New returns a new apiserver. func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master { etcdClient := etcd.NewClient(etcdServers) minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp) @@ -94,7 +94,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf } -// Runs master. Never returns. +// Run runs master. Never returns. func (m *Master) Run(myAddress, apiPrefix string) error { endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) @@ -109,8 +109,9 @@ func (m *Master) Run(myAddress, apiPrefix string) error { return s.ListenAndServe() } -// Instead of calling Run, call ConstructHandler to get a handler for your own -// server. Intended for testing. Only call once. +// ConstructHandler returns an http.Handler which serves Kubernetes API. +// Instead of calling Run, you can call this function to get a handler for your own server. +// Intended for testing. Only call once. func (m *Master) ConstructHandler(apiPrefix string) http.Handler { endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index d075e6cde57..706f6edd0f9 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -40,6 +40,7 @@ type PodCache struct { podLock sync.Mutex } +// NewPodCache returns a new PodCache. func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache { return &PodCache{ containerInfo: info, @@ -49,7 +50,7 @@ func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period ti } } -// Implements the PodInfoGetter interface. +// GetPodInfo Implements the PodInfoGetter.GetPodInfo. // The returned value should be treated as read-only. func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { p.podLock.Lock() @@ -57,9 +58,8 @@ func (p *PodCache) GetPodInfo(host, podID string) (api.PodInfo, error) { value, ok := p.podInfo[podID] if !ok { return nil, errors.New("No cached pod info") - } else { - return value, nil } + return value, nil } func (p *PodCache) updatePodInfo(host, id string) error { @@ -73,7 +73,7 @@ func (p *PodCache) updatePodInfo(host, id string) error { return nil } -// Update information about all containers. Either called by Loop() below, or one-off. +// UpdateAllContainers updates information about all containers. Either called by Loop() below, or one-off. func (p *PodCache) UpdateAllContainers() { pods, err := p.pods.ListPods(labels.Everything()) if err != nil { From 7373695e33cb98492a6d8d2a7ae4a4a944da4035 Mon Sep 17 00:00:00 2001 From: Yuki Yugui Sonoda Date: Fri, 11 Jul 2014 20:48:18 +0900 Subject: [PATCH 07/10] Fixes golint errors in pkg/proxy --- pkg/proxy/config/config.go | 66 ++++++++++++++++++++++------------- pkg/proxy/config/etcd.go | 58 ++++++++++++++++-------------- pkg/proxy/config/etcd_test.go | 16 ++++----- pkg/proxy/config/file.go | 22 +++++++----- pkg/proxy/proxier.go | 24 +++++++------ pkg/proxy/roundrobbin.go | 18 +++++----- pkg/proxy/roundrobbin_test.go | 10 +++--- 7 files changed, 119 insertions(+), 95 deletions(-) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 99db95f9b06..9ceec048b04 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -24,40 +24,44 @@ import ( "github.com/golang/glog" ) +// Operation is a type of operation of services or endpoints. type Operation int +// These are the available operation types. const ( SET Operation = iota ADD REMOVE ) -// Defines an operation sent on the channel. You can add or remove single services by -// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system -// to a given state for this source configuration, set Services as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source -// channel. To remove all services, set Services to empty array and Op to SET +// ServiceUpdate describes an operation of services, sent on the channel. +// You can add or remove single services by sending an array of size one and Op == ADD|REMOVE. +// For setting the state of the system to a given state for this source configuration, set Services as desired and Op to SET, +// which will reset the system state to that specified in this operation for this source channel. +// To remove all services, set Services to empty array and Op to SET type ServiceUpdate struct { Services []api.Service Op Operation } -// Defines an operation sent on the channel. You can add or remove single endpoints by -// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system -// to a given state for this source configuration, set Endpoints as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source -// channel. To remove all endpoints, set Endpoints to empty array and Op to SET +// EndpointsUpdate describes an operation of endpoints, sent on the channel. +// You can add or remove single endpoints by sending an array of size one and Op == ADD|REMOVE. +// For setting the state of the system to a given state for this source configuration, set Endpoints as desired and Op to SET, +// which will reset the system state to that specified in this operation for this source channel. +// To remove all endpoints, set Endpoints to empty array and Op to SET type EndpointsUpdate struct { Endpoints []api.Endpoints Op Operation } +// ServiceConfigHandler handles update notifications of the set of services. type ServiceConfigHandler interface { - // Sent when a configuration has been changed by one of the sources. This is the - // union of all the configuration sources. + // OnUpdate gets called when a configuration has been changed by one of the sources. + // This is the union of all the configuration sources. OnUpdate(services []api.Service) } +// EndpointsConfigHandler handles update notifications of the set of endpoints. type EndpointsConfigHandler interface { // OnUpdate gets called when endpoints configuration is changed for a given // service on any of the configuration sources. An example is when a new @@ -65,6 +69,8 @@ type EndpointsConfigHandler interface { OnUpdate(endpoints []api.Endpoints) } +// ServiceConfig tracks a set of service configurations and their endpoint configurations. +// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change. type ServiceConfig struct { // Configuration sources and their lock. configSourceLock sync.RWMutex @@ -94,6 +100,8 @@ type ServiceConfig struct { endpointsNotifyChannel chan string } +// NewServiceConfig creates a new ServiceConfig. +// It immediately runs the created ServiceConfig. func NewServiceConfig() *ServiceConfig { config := &ServiceConfig{ serviceConfigSources: make(map[string]chan ServiceUpdate), @@ -109,22 +117,26 @@ func NewServiceConfig() *ServiceConfig { return config } +// Run begins a loop to accept new service configurations and new endpoint configurations. +// It never returns. func (impl *ServiceConfig) Run() { glog.Infof("Starting the config Run loop") for { select { case source := <-impl.serviceNotifyChannel: glog.Infof("Got new service configuration from source %s", source) - impl.NotifyServiceUpdate() + impl.notifyServiceUpdate() case source := <-impl.endpointsNotifyChannel: glog.Infof("Got new endpoint configuration from source %s", source) - impl.NotifyEndpointsUpdate() + impl.notifyEndpointsUpdate() case <-time.After(1 * time.Second): } } } -func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel chan ServiceUpdate) { +// serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel. +// It never returns. +func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) { // Represents the current services configuration for this channel. serviceMap := make(map[string]api.Service) for { @@ -160,7 +172,9 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c } } -func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { +// endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel. +// It never returns. +func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) { endpointMap := make(map[string]api.Endpoints) for { select { @@ -214,11 +228,11 @@ func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan Se } newChannel := make(chan ServiceUpdate) impl.serviceConfigSources[source] = newChannel - go impl.ServiceChannelListener(source, newChannel) + go impl.serviceChannelListener(source, newChannel) return newChannel } -// GetEndpointConfigurationChannel returns a channel where a configuration source +// GetEndpointsConfigurationChannel returns a channel where a configuration source // can send updates of new endpoint configurations. Multiple calls with the same // source will return the same channel. This allows change and state based sources // to use the same channel. Difference source names however will be treated as a @@ -235,11 +249,11 @@ func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan } newChannel := make(chan EndpointsUpdate) impl.endpointsConfigSources[source] = newChannel - go impl.EndpointsChannelListener(source, newChannel) + go impl.endpointsChannelListener(source, newChannel) return newChannel } -// Register ServiceConfigHandler to receive updates of changes to services. +// RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services. func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) { impl.handlerLock.Lock() defer impl.handlerLock.Unlock() @@ -255,7 +269,7 @@ func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) panic("Only up to 10 service handlers supported for now") } -// Register ServiceConfigHandler to receive updates of changes to services. +// RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services. func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) { impl.handlerLock.Lock() defer impl.handlerLock.Unlock() @@ -271,8 +285,9 @@ func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandl panic("Only up to 10 endpoint handlers supported for now") } -func (impl *ServiceConfig) NotifyServiceUpdate() { - services := make([]api.Service, 0) +// notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services. +func (impl *ServiceConfig) notifyServiceUpdate() { + services := []api.Service{} impl.configLock.RLock() for _, sourceServices := range impl.serviceConfig { for _, value := range sourceServices { @@ -291,8 +306,9 @@ func (impl *ServiceConfig) NotifyServiceUpdate() { } } -func (impl *ServiceConfig) NotifyEndpointsUpdate() { - endpoints := make([]api.Endpoints, 0) +// notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints. +func (impl *ServiceConfig) notifyEndpointsUpdate() { + endpoints := []api.Endpoints{} impl.configLock.RLock() for _, sourceEndpoints := range impl.endpointConfig { for _, value := range sourceEndpoints { diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go index 5b536f4e9ca..cac508a649b 100644 --- a/pkg/proxy/config/etcd.go +++ b/pkg/proxy/config/etcd.go @@ -15,7 +15,7 @@ limitations under the License. */ // Watches etcd and gets the full configuration on preset intervals. -// Expects the list of exposed services to live under: +// It expects the list of exposed services to live under: // registry/services // which in etcd is exposed like so: // http:///v2/keys/registry/services @@ -30,7 +30,7 @@ limitations under the License. // '[ { "machine": , "name": }, // { "machine": , "name": } // ]', -// + package config import ( @@ -44,14 +44,18 @@ import ( "github.com/golang/glog" ) -const RegistryRoot = "registry/services" +// registryRoot is the key prefix for service configs in etcd. +const registryRoot = "registry/services" +// ConfigSourceEtcd communicates with a etcd via the client, and sends the change notification of services and endpoints to the specified channels. type ConfigSourceEtcd struct { client *etcd.Client serviceChannel chan ServiceUpdate endpointsChannel chan EndpointsUpdate } +// NewConfigSourceEtcd creates a new ConfigSourceEtcd. +// It immediately runs the created ConfigSourceEtcd in a goroutine. func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd { config := ConfigSourceEtcd{ client: client, @@ -62,13 +66,14 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, return config } +// Run begins watching for new services and their endpoints on etcd. func (impl ConfigSourceEtcd) Run() { // Initially, just wait for the etcd to come up before doing anything more complicated. var services []api.Service var endpoints []api.Endpoints var err error for { - services, endpoints, err = impl.GetServices() + services, endpoints, err = impl.getServices() if err == nil { break } @@ -87,10 +92,10 @@ func (impl ConfigSourceEtcd) Run() { // Ok, so we got something back from etcd. Let's set up a watch for new services, and // their endpoints - go impl.WatchForChanges() + go impl.watchForChanges() for { - services, endpoints, err = impl.GetServices() + services, endpoints, err = impl.getServices() if err != nil { glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err) } else { @@ -107,12 +112,12 @@ func (impl ConfigSourceEtcd) Run() { } } -// Finds the list of services and their endpoints from etcd. +// getServices finds the list of services and their endpoints from etcd. // This operation is akin to a set a known good at regular intervals. -func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) { - response, err := impl.client.Get(RegistryRoot+"/specs", true, false) +func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) { + response, err := impl.client.Get(registryRoot+"/specs", true, false) if err != nil { - glog.Errorf("Failed to get the key %s: %v", RegistryRoot, err) + glog.Errorf("Failed to get the key %s: %v", registryRoot, err) return make([]api.Service, 0), make([]api.Endpoints, 0), err } if response.Node.Dir == true { @@ -129,7 +134,7 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro continue } retServices[i] = svc - endpoints, err := impl.GetEndpoints(svc.ID) + endpoints, err := impl.getEndpoints(svc.ID) if err != nil { glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err) } @@ -138,23 +143,23 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro } return retServices, retEndpoints, err } - return nil, nil, fmt.Errorf("did not get the root of the registry %s", RegistryRoot) + return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot) } -func (impl ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) { - key := fmt.Sprintf(RegistryRoot + "/endpoints/" + service) +// getEndpoints finds the list of endpoints of the service from etcd. +func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) { + key := fmt.Sprintf(registryRoot + "/endpoints/" + service) response, err := impl.client.Get(key, true, false) if err != nil { glog.Errorf("Failed to get the key: %s %v", key, err) return api.Endpoints{}, err } // Parse all the endpoint specifications in this value. - return ParseEndpoints(response.Node.Value) + return parseEndpoints(response.Node.Value) } -// EtcdResponseToServiceAndLocalport takes an etcd response and pulls it apart to find -// service -func EtcdResponseToService(response *etcd.Response) (*api.Service, error) { +// etcdResponseToService takes an etcd response and pulls it apart to find service. +func etcdResponseToService(response *etcd.Response) (*api.Service, error) { if response.Node == nil { return nil, fmt.Errorf("invalid response from etcd: %#v", response) } @@ -166,31 +171,31 @@ func EtcdResponseToService(response *etcd.Response) (*api.Service, error) { return &svc, err } -func ParseEndpoints(jsonString string) (api.Endpoints, error) { +func parseEndpoints(jsonString string) (api.Endpoints, error) { var e api.Endpoints err := json.Unmarshal([]byte(jsonString), &e) return e, err } -func (impl ConfigSourceEtcd) WatchForChanges() { +func (impl ConfigSourceEtcd) watchForChanges() { glog.Info("Setting up a watch for new services") watchChannel := make(chan *etcd.Response) go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil) for { watchResponse := <-watchChannel - impl.ProcessChange(watchResponse) + impl.processChange(watchResponse) } } -func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) { +func (impl ConfigSourceEtcd) processChange(response *etcd.Response) { glog.Infof("Processing a change in service configuration... %s", *response) // If it's a new service being added (signified by a localport being added) // then process it as such if strings.Contains(response.Node.Key, "/endpoints/") { - impl.ProcessEndpointResponse(response) + impl.processEndpointResponse(response) } else if response.Action == "set" { - service, err := EtcdResponseToService(response) + service, err := etcdResponseToService(response) if err != nil { glog.Errorf("Failed to parse %s Port: %s", response, err) return @@ -208,13 +213,12 @@ func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) { serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}} impl.serviceChannel <- serviceUpdate return - } else { - glog.Infof("Unknown service delete: %#v", parts) } + glog.Infof("Unknown service delete: %#v", parts) } } -func (impl ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) { +func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) { glog.Infof("Processing a change in endpoint configuration... %s", *response) var endpoints api.Endpoints err := json.Unmarshal([]byte(response.Node.Value), &endpoints) diff --git a/pkg/proxy/config/etcd_test.go b/pkg/proxy/config/etcd_test.go index 7c7a832f0df..9b1dafbcccb 100644 --- a/pkg/proxy/config/etcd_test.go +++ b/pkg/proxy/config/etcd_test.go @@ -26,15 +26,15 @@ import ( const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34" const TomcatService = "tomcat" -const TomcatContainerId = "tomcat-3bd5af34" +const TomcatContainerID = "tomcat-3bd5af34" -func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { - endpoints, err := ParseEndpoints(jsonString) +func validateJSONParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) { + endpoints, err := parseEndpoints(jsonString) if err == nil && expectError { - t.Errorf("ValidateJsonParsing did not get expected error when parsing %s", jsonString) + t.Errorf("validateJSONParsing did not get expected error when parsing %s", jsonString) } if err != nil && !expectError { - t.Errorf("ValidateJsonParsing got unexpected error %+v when parsing %s", err, jsonString) + t.Errorf("validateJSONParsing got unexpected error %+v when parsing %s", err, jsonString) } if !reflect.DeepEqual(expectedEndpoints, endpoints) { t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints) @@ -42,7 +42,7 @@ func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api. } func TestParseJsonEndpoints(t *testing.T) { - ValidateJsonParsing(t, "", api.Endpoints{}, true) + validateJSONParsing(t, "", api.Endpoints{}, true) endpoints := api.Endpoints{ Name: "foo", Endpoints: []string{"foo", "bar", "baz"}, @@ -51,6 +51,6 @@ func TestParseJsonEndpoints(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %#v", err) } - ValidateJsonParsing(t, string(data), endpoints, false) - // ValidateJsonParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) + validateJSONParsing(t, string(data), endpoints, false) + // validateJSONParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false) } diff --git a/pkg/proxy/config/file.go b/pkg/proxy/config/file.go index 6ae0a09db8d..2053691bae2 100644 --- a/pkg/proxy/config/file.go +++ b/pkg/proxy/config/file.go @@ -28,6 +28,7 @@ limitations under the License. // } //] //} + package config import ( @@ -41,22 +42,24 @@ import ( "github.com/golang/glog" ) -// TODO: kill this struct. -type ServiceJSON struct { - Name string - Port int - Endpoints []string -} -type ConfigFile struct { - Services []ServiceJSON +// serviceConfig is a deserialized form of the config file format which ConfigSourceFile accepts. +type serviceConfig struct { + Services []struct { + Name string `json: "name"` + Port int `json: "port"` + Endpoints []string `json: "endpoints"` + } `json: "service"` } +// ConfigSourceFile periodically reads service configurations in JSON from a file, and sends the services and endpoints defined in th file to the specified channels. type ConfigSourceFile struct { serviceChannel chan ServiceUpdate endpointsChannel chan EndpointsUpdate filename string } +// NewConfigSourceFile creates a new ConfigSourceFile. +// It immediately runs the created ConfigSourceFile in a goroutine. func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile { config := ConfigSourceFile{ filename: filename, @@ -67,6 +70,7 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end return config } +// Run begins watching the config file. func (impl ConfigSourceFile) Run() { glog.Infof("Watching file %s", impl.filename) var lastData []byte @@ -78,7 +82,7 @@ func (impl ConfigSourceFile) Run() { if err != nil { glog.Errorf("Couldn't read file: %s : %v", impl.filename, err) } else { - var config ConfigFile + var config serviceConfig err = json.Unmarshal(data, &config) if err != nil { glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err) diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 5e781456d5e..f957a51b06e 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -38,8 +38,8 @@ func NewProxier(loadBalancer LoadBalancer) *Proxier { return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} } -// CopyBytes copies bytes from in to out until EOF. -func CopyBytes(in, out *net.TCPConn) { +// copyBytes copies bytes from in to out until EOF. +func copyBytes(in, out *net.TCPConn) { glog.Infof("Copying from %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) _, err := io.Copy(in, out) @@ -51,15 +51,17 @@ func CopyBytes(in, out *net.TCPConn) { out.CloseWrite() } -// ProxyConnection creates a bidirectional byte shuffler. -// Copies bytes to/from each connection. -func ProxyConnection(in, out *net.TCPConn) { +// proxyConnection creates a bidirectional byte shuffler. +// It copies bytes to/from each connection. +func proxyConnection(in, out *net.TCPConn) { glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) - go CopyBytes(in, out) - go CopyBytes(out, in) + go copyBytes(in, out) + go copyBytes(out, in) } +// AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints. +// It never returns. func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { for { inConn, err := listener.Accept() @@ -86,12 +88,12 @@ func (proxier Proxier) AcceptHandler(service string, listener net.Listener) { inConn.Close() continue } - go ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) + go proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn)) } } -// AddService starts listening for a new service on a given port. -func (proxier Proxier) AddService(service string, port int) error { +// addService starts listening for a new service on a given port. +func (proxier Proxier) addService(service string, port int) error { // Make sure we can start listening on the port before saying all's well. l, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { @@ -127,7 +129,7 @@ func (proxier Proxier) OnUpdate(services []api.Service) { port, exists := proxier.serviceMap[service.ID] if !exists || port != service.Port { glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) - err := proxier.AddService(service.ID, service.Port) + err := proxier.addService(service.ID, service.Port) if err == nil { proxier.serviceMap[service.ID] = service.Port } else { diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index a5474843737..d21831e8e6c 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -30,7 +30,7 @@ import ( "github.com/golang/glog" ) -// LoadBalancerRR is a round-robin load balancer. +// LoadBalancerRR is a round-robin load balancer. It implements LoadBalancer. type LoadBalancerRR struct { lock sync.RWMutex endpointsMap map[string][]string @@ -42,9 +42,7 @@ func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)} } -// LoadBalance registers srcAddr for the provided service. -// It returns error if no entry is available for the service -// or no previous endpoints are registered. +// LoadBalance select an endpoint of the service by round-robin algorithm. func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) { impl.lock.RLock() endpoints, exists := impl.endpointsMap[service] @@ -61,8 +59,8 @@ func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string return endpoint, nil } -// IsValid returns true if spec is valid. -func (impl LoadBalancerRR) IsValid(spec string) bool { +// isValid returns true if spec is valid. +func (impl LoadBalancerRR) isValid(spec string) bool { index := strings.Index(spec, ":") if index == -1 { return false @@ -74,11 +72,11 @@ func (impl LoadBalancerRR) IsValid(spec string) bool { return value > 0 } -// FilterValidEndpoints filters out invalid endpoints. -func (impl LoadBalancerRR) FilterValidEndpoints(endpoints []string) []string { +// filterValidEndpoints filters out invalid endpoints. +func (impl LoadBalancerRR) filterValidEndpoints(endpoints []string) []string { var result []string for _, spec := range endpoints { - if impl.IsValid(spec) { + if impl.isValid(spec) { result = append(result, spec) } } @@ -97,7 +95,7 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { existingEndpoints, exists := impl.endpointsMap[value.Name] if !exists || !reflect.DeepEqual(value.Endpoints, existingEndpoints) { glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints) - impl.endpointsMap[value.Name] = impl.FilterValidEndpoints(value.Endpoints) + impl.endpointsMap[value.Name] = impl.filterValidEndpoints(value.Endpoints) // Start RR from the beginning if added or updated. impl.rrIndex[value.Name] = 0 } diff --git a/pkg/proxy/roundrobbin_test.go b/pkg/proxy/roundrobbin_test.go index 216cdbd2de0..1112141de80 100644 --- a/pkg/proxy/roundrobbin_test.go +++ b/pkg/proxy/roundrobbin_test.go @@ -24,16 +24,16 @@ import ( func TestLoadBalanceValidateWorks(t *testing.T) { loadBalancer := NewLoadBalancerRR() - if loadBalancer.IsValid("") { + if loadBalancer.isValid("") { t.Errorf("Didn't fail for empty string") } - if loadBalancer.IsValid("foobar") { + if loadBalancer.isValid("foobar") { t.Errorf("Didn't fail with no port") } - if loadBalancer.IsValid("foobar:-1") { + if loadBalancer.isValid("foobar:-1") { t.Errorf("Didn't fail with a negative port") } - if !loadBalancer.IsValid("foobar:8080") { + if !loadBalancer.isValid("foobar:8080") { t.Errorf("Failed a valid config.") } } @@ -41,7 +41,7 @@ func TestLoadBalanceValidateWorks(t *testing.T) { func TestLoadBalanceFilterWorks(t *testing.T) { loadBalancer := NewLoadBalancerRR() endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"} - filtered := loadBalancer.FilterValidEndpoints(endpoints) + filtered := loadBalancer.filterValidEndpoints(endpoints) if len(filtered) != 3 { t.Errorf("Failed to filter to the correct size") From 60dd1f7cc0c1977c67c55675d9c0efa2723b8872 Mon Sep 17 00:00:00 2001 From: Yuki Yugui Sonoda Date: Tue, 15 Jul 2014 20:54:05 +0900 Subject: [PATCH 08/10] Eliminates tautological comments --- pkg/labels/labels.go | 4 ++-- pkg/master/master.go | 8 ++++---- pkg/master/pod_cache.go | 5 +++-- pkg/proxy/config/config.go | 4 ++-- pkg/proxy/config/etcd.go | 3 +-- pkg/proxy/config/file.go | 5 ++--- pkg/proxy/proxier.go | 6 +++--- pkg/proxy/roundrobbin.go | 3 +-- 8 files changed, 18 insertions(+), 20 deletions(-) diff --git a/pkg/labels/labels.go b/pkg/labels/labels.go index 5c980d4dea0..8c1e050f876 100644 --- a/pkg/labels/labels.go +++ b/pkg/labels/labels.go @@ -23,7 +23,7 @@ import ( // Labels allows you to present labels independently from their storage. type Labels interface { - // Get returns the value identified by the label. + // Get returns the value for the provided label. Get(label string) (value string) } @@ -42,7 +42,7 @@ func (ls Set) String() string { return strings.Join(selector, ",") } -// Get returns the value for the provided label. +// Get returns the value in the map for the provided label. func (ls Set) Get(label string) string { return ls[label] } diff --git a/pkg/master/master.go b/pkg/master/master.go index 84816d6c5d9..7434ee135d4 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -43,7 +43,7 @@ type Master struct { storage map[string]apiserver.RESTStorage } -// NewMemoryServer returns a memory (not etcd) backed apiserver. +// NewMemoryServer returns a new instance of Master backed with memory (not etcd). func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface) *Master { m := &Master{ podRegistry: registry.MakeMemoryRegistry(), @@ -55,7 +55,7 @@ func NewMemoryServer(minions []string, podInfoGetter client.PodInfoGetter, cloud return m } -// New returns a new apiserver. +// New returns a new instance of Master connected to the given etcdServer. func New(etcdServers, minions []string, podInfoGetter client.PodInfoGetter, cloud cloudprovider.Interface, minionRegexp string) *Master { etcdClient := etcd.NewClient(etcdServers) minionRegistry := minionRegistryMaker(minions, cloud, minionRegexp) @@ -94,7 +94,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf } -// Run runs master. Never returns. +// Run begins serving the Kubernetes API. It never returns. func (m *Master) Run(myAddress, apiPrefix string) error { endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) @@ -111,7 +111,7 @@ func (m *Master) Run(myAddress, apiPrefix string) error { // ConstructHandler returns an http.Handler which serves Kubernetes API. // Instead of calling Run, you can call this function to get a handler for your own server. -// Intended for testing. Only call once. +// It is intended for testing. Only call once. func (m *Master) ConstructHandler(apiPrefix string) http.Handler { endpoints := registry.MakeEndpointController(m.serviceRegistry, m.podRegistry) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index a5dffe96dec..c6be9a9f9dd 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -40,7 +40,7 @@ type PodCache struct { podLock sync.Mutex } -// NewPodCache returns a new PodCache. +// NewPodCache returns a new PodCache which watches container information registered in the given PodRegistry. func NewPodCache(info client.PodInfoGetter, pods registry.PodRegistry, period time.Duration) *PodCache { return &PodCache{ containerInfo: info, @@ -88,7 +88,8 @@ func (p *PodCache) UpdateAllContainers() { } } -// Loop runs forever, it is expected to be placed in a go routine. +// Loop begins watching updates of container information. +// It runs forever, and is expected to be placed in a go routine. func (p *PodCache) Loop() { util.Forever(func() { p.UpdateAllContainers() }, p.period) } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 9ceec048b04..9a95e037c3a 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -54,14 +54,14 @@ type EndpointsUpdate struct { Op Operation } -// ServiceConfigHandler handles update notifications of the set of services. +// ServiceConfigHandler is an abstract interface of objects which receive update notifications of the set of services. type ServiceConfigHandler interface { // OnUpdate gets called when a configuration has been changed by one of the sources. // This is the union of all the configuration sources. OnUpdate(services []api.Service) } -// EndpointsConfigHandler handles update notifications of the set of endpoints. +// EndpointsConfigHandler is an abstract interface of objects which receive update notifications of the set of endpoints. type EndpointsConfigHandler interface { // OnUpdate gets called when endpoints configuration is changed for a given // service on any of the configuration sources. An example is when a new diff --git a/pkg/proxy/config/etcd.go b/pkg/proxy/config/etcd.go index cac508a649b..89488fca4f2 100644 --- a/pkg/proxy/config/etcd.go +++ b/pkg/proxy/config/etcd.go @@ -54,8 +54,7 @@ type ConfigSourceEtcd struct { endpointsChannel chan EndpointsUpdate } -// NewConfigSourceEtcd creates a new ConfigSourceEtcd. -// It immediately runs the created ConfigSourceEtcd in a goroutine. +// NewConfigSourceEtcd creates a new ConfigSourceEtcd and immediately runs the created ConfigSourceEtcd in a goroutine. func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd { config := ConfigSourceEtcd{ client: client, diff --git a/pkg/proxy/config/file.go b/pkg/proxy/config/file.go index e0c1755621c..ac6810bdec3 100644 --- a/pkg/proxy/config/file.go +++ b/pkg/proxy/config/file.go @@ -51,15 +51,14 @@ type serviceConfig struct { } `json: "service"` } -// ConfigSourceFile periodically reads service configurations in JSON from a file, and sends the services and endpoints defined in th file to the specified channels. +// ConfigSourceFile periodically reads service configurations in JSON from a file, and sends the services and endpoints defined in the file to the specified channels. type ConfigSourceFile struct { serviceChannel chan ServiceUpdate endpointsChannel chan EndpointsUpdate filename string } -// NewConfigSourceFile creates a new ConfigSourceFile. -// It immediately runs the created ConfigSourceFile in a goroutine. +// NewConfigSourceFile creates a new ConfigSourceFile and let it immediately runs the created ConfigSourceFile in a goroutine. func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile { config := ConfigSourceFile{ filename: filename, diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 0746dd24a5f..602b7c69b87 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -33,12 +33,11 @@ type Proxier struct { serviceMap map[string]int } -// NewProxier returns a new Proxier. +// NewProxier returns a newly created and correctly initialized instance of Proxier. func NewProxier(loadBalancer LoadBalancer) *Proxier { return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)} } -// copyBytes copies bytes from in to out until EOF. func copyBytes(in, out *net.TCPConn) { glog.Infof("Copying from %v <-> %v <-> %v <-> %v", in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) @@ -122,7 +121,8 @@ func (proxier Proxier) addServiceCommon(service string, l net.Listener) { go proxier.AcceptHandler(service, l) } -// OnUpdate handles update notices for the updated services. +// OnUpdate recieves update notices for the updated services and start listening newly added services. +// It implements "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config".ServiceConfigHandler.OnUpdate. func (proxier Proxier) OnUpdate(services []api.Service) { glog.Infof("Received update notice: %+v", services) for _, service := range services { diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index 60eb41c5410..e6e192e399b 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -36,7 +36,7 @@ type LoadBalancerRR struct { rrIndex map[string]int } -// NewLoadBalancerRR returns a new LoadBalancerRR. +// NewLoadBalancerRR returns a newly created and correctly initialized instance of LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)} } @@ -70,7 +70,6 @@ func (impl LoadBalancerRR) isValid(spec string) bool { return value > 0 } -// filterValidEndpoints filters out invalid endpoints. func (impl LoadBalancerRR) filterValidEndpoints(endpoints []string) []string { var result []string for _, spec := range endpoints { From 1bd7276acaf3ccfbc2eefe98ed83578eaf77436f Mon Sep 17 00:00:00 2001 From: Yuki Yugui Sonoda Date: Tue, 15 Jul 2014 20:55:04 +0900 Subject: [PATCH 09/10] Correct the style of nested conditionals --- pkg/proxy/proxier.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 602b7c69b87..a7822e56a29 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -127,14 +127,15 @@ func (proxier Proxier) OnUpdate(services []api.Service) { glog.Infof("Received update notice: %+v", services) for _, service := range services { port, exists := proxier.serviceMap[service.ID] - if !exists || port != service.Port { - glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) - err := proxier.addService(service.ID, service.Port) - if err == nil { - proxier.serviceMap[service.ID] = service.Port - } else { - glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port) - } + if exists && port == service.Port { + continue } + glog.Infof("Adding a new service %s on port %d", service.ID, service.Port) + err := proxier.addService(service.ID, service.Port) + if err != nil { + glog.Infof("Failed to start listening for %s on %d", service.ID, service.Port) + continue + } + proxier.serviceMap[service.ID] = service.Port } } From 28619550250bcabf4fc0c5ca49290352e785d004 Mon Sep 17 00:00:00 2001 From: Yuki Yugui Sonoda Date: Tue, 15 Jul 2014 22:03:08 +0900 Subject: [PATCH 10/10] Corrects wording of godoc comments. --- pkg/master/master.go | 2 +- pkg/proxy/config/config.go | 4 ++-- pkg/proxy/roundrobbin.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index 7434ee135d4..e12fc7f28ec 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -109,7 +109,7 @@ func (m *Master) Run(myAddress, apiPrefix string) error { return s.ListenAndServe() } -// ConstructHandler returns an http.Handler which serves Kubernetes API. +// ConstructHandler returns an http.Handler which serves the Kubernetes API. // Instead of calling Run, you can call this function to get a handler for your own server. // It is intended for testing. Only call once. func (m *Master) ConstructHandler(apiPrefix string) http.Handler { diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 9a95e037c3a..7f17d71d6f5 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -54,14 +54,14 @@ type EndpointsUpdate struct { Op Operation } -// ServiceConfigHandler is an abstract interface of objects which receive update notifications of the set of services. +// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services. type ServiceConfigHandler interface { // OnUpdate gets called when a configuration has been changed by one of the sources. // This is the union of all the configuration sources. OnUpdate(services []api.Service) } -// EndpointsConfigHandler is an abstract interface of objects which receive update notifications of the set of endpoints. +// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. type EndpointsConfigHandler interface { // OnUpdate gets called when endpoints configuration is changed for a given // service on any of the configuration sources. An example is when a new diff --git a/pkg/proxy/roundrobbin.go b/pkg/proxy/roundrobbin.go index e6e192e399b..a5acebba7fc 100644 --- a/pkg/proxy/roundrobbin.go +++ b/pkg/proxy/roundrobbin.go @@ -41,7 +41,7 @@ func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)} } -// LoadBalance select an endpoint of the service by round-robin algorithm. +// LoadBalance selects an endpoint of the service by round-robin algorithm. func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) { impl.lock.RLock() endpoints, exists := impl.endpointsMap[service]