mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 00:07:50 +00:00
Fixes golint errors in pkg/proxy
This commit is contained in:
parent
88284171f2
commit
7373695e33
@ -24,40 +24,44 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Operation is a type of operation of services or endpoints.
|
||||||
type Operation int
|
type Operation int
|
||||||
|
|
||||||
|
// These are the available operation types.
|
||||||
const (
|
const (
|
||||||
SET Operation = iota
|
SET Operation = iota
|
||||||
ADD
|
ADD
|
||||||
REMOVE
|
REMOVE
|
||||||
)
|
)
|
||||||
|
|
||||||
// Defines an operation sent on the channel. You can add or remove single services by
|
// ServiceUpdate describes an operation of services, sent on the channel.
|
||||||
// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system
|
// You can add or remove single services by sending an array of size one and Op == ADD|REMOVE.
|
||||||
// to a given state for this source configuration, set Services as desired and Op to SET,
|
// For setting the state of the system to a given state for this source configuration, set Services as desired and Op to SET,
|
||||||
// which will reset the system state to that specified in this operation for this source
|
// which will reset the system state to that specified in this operation for this source channel.
|
||||||
// channel. To remove all services, set Services to empty array and Op to SET
|
// To remove all services, set Services to empty array and Op to SET
|
||||||
type ServiceUpdate struct {
|
type ServiceUpdate struct {
|
||||||
Services []api.Service
|
Services []api.Service
|
||||||
Op Operation
|
Op Operation
|
||||||
}
|
}
|
||||||
|
|
||||||
// Defines an operation sent on the channel. You can add or remove single endpoints by
|
// EndpointsUpdate describes an operation of endpoints, sent on the channel.
|
||||||
// sending an array of size one and Op == ADD|REMOVE. For setting the state of the system
|
// You can add or remove single endpoints by sending an array of size one and Op == ADD|REMOVE.
|
||||||
// to a given state for this source configuration, set Endpoints as desired and Op to SET,
|
// For setting the state of the system to a given state for this source configuration, set Endpoints as desired and Op to SET,
|
||||||
// which will reset the system state to that specified in this operation for this source
|
// which will reset the system state to that specified in this operation for this source channel.
|
||||||
// channel. To remove all endpoints, set Endpoints to empty array and Op to SET
|
// To remove all endpoints, set Endpoints to empty array and Op to SET
|
||||||
type EndpointsUpdate struct {
|
type EndpointsUpdate struct {
|
||||||
Endpoints []api.Endpoints
|
Endpoints []api.Endpoints
|
||||||
Op Operation
|
Op Operation
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServiceConfigHandler handles update notifications of the set of services.
|
||||||
type ServiceConfigHandler interface {
|
type ServiceConfigHandler interface {
|
||||||
// Sent when a configuration has been changed by one of the sources. This is the
|
// OnUpdate gets called when a configuration has been changed by one of the sources.
|
||||||
// union of all the configuration sources.
|
// This is the union of all the configuration sources.
|
||||||
OnUpdate(services []api.Service)
|
OnUpdate(services []api.Service)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EndpointsConfigHandler handles update notifications of the set of endpoints.
|
||||||
type EndpointsConfigHandler interface {
|
type EndpointsConfigHandler interface {
|
||||||
// OnUpdate gets called when endpoints configuration is changed for a given
|
// OnUpdate gets called when endpoints configuration is changed for a given
|
||||||
// service on any of the configuration sources. An example is when a new
|
// service on any of the configuration sources. An example is when a new
|
||||||
@ -65,6 +69,8 @@ type EndpointsConfigHandler interface {
|
|||||||
OnUpdate(endpoints []api.Endpoints)
|
OnUpdate(endpoints []api.Endpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ServiceConfig tracks a set of service configurations and their endpoint configurations.
|
||||||
|
// It accepts "set", "add" and "remove" operations of services and endpoints via channels, and invokes registered handlers on change.
|
||||||
type ServiceConfig struct {
|
type ServiceConfig struct {
|
||||||
// Configuration sources and their lock.
|
// Configuration sources and their lock.
|
||||||
configSourceLock sync.RWMutex
|
configSourceLock sync.RWMutex
|
||||||
@ -94,6 +100,8 @@ type ServiceConfig struct {
|
|||||||
endpointsNotifyChannel chan string
|
endpointsNotifyChannel chan string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewServiceConfig creates a new ServiceConfig.
|
||||||
|
// It immediately runs the created ServiceConfig.
|
||||||
func NewServiceConfig() *ServiceConfig {
|
func NewServiceConfig() *ServiceConfig {
|
||||||
config := &ServiceConfig{
|
config := &ServiceConfig{
|
||||||
serviceConfigSources: make(map[string]chan ServiceUpdate),
|
serviceConfigSources: make(map[string]chan ServiceUpdate),
|
||||||
@ -109,22 +117,26 @@ func NewServiceConfig() *ServiceConfig {
|
|||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run begins a loop to accept new service configurations and new endpoint configurations.
|
||||||
|
// It never returns.
|
||||||
func (impl *ServiceConfig) Run() {
|
func (impl *ServiceConfig) Run() {
|
||||||
glog.Infof("Starting the config Run loop")
|
glog.Infof("Starting the config Run loop")
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case source := <-impl.serviceNotifyChannel:
|
case source := <-impl.serviceNotifyChannel:
|
||||||
glog.Infof("Got new service configuration from source %s", source)
|
glog.Infof("Got new service configuration from source %s", source)
|
||||||
impl.NotifyServiceUpdate()
|
impl.notifyServiceUpdate()
|
||||||
case source := <-impl.endpointsNotifyChannel:
|
case source := <-impl.endpointsNotifyChannel:
|
||||||
glog.Infof("Got new endpoint configuration from source %s", source)
|
glog.Infof("Got new endpoint configuration from source %s", source)
|
||||||
impl.NotifyEndpointsUpdate()
|
impl.notifyEndpointsUpdate()
|
||||||
case <-time.After(1 * time.Second):
|
case <-time.After(1 * time.Second):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel chan ServiceUpdate) {
|
// serviceChannelListener begins a loop to handle incoming ServiceUpdate notifications from the channel.
|
||||||
|
// It never returns.
|
||||||
|
func (impl *ServiceConfig) serviceChannelListener(source string, listenChannel chan ServiceUpdate) {
|
||||||
// Represents the current services configuration for this channel.
|
// Represents the current services configuration for this channel.
|
||||||
serviceMap := make(map[string]api.Service)
|
serviceMap := make(map[string]api.Service)
|
||||||
for {
|
for {
|
||||||
@ -160,7 +172,9 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel chan EndpointsUpdate) {
|
// endpointsChannelListener begins a loop to handle incoming EndpointsUpdate notifications from the channel.
|
||||||
|
// It never returns.
|
||||||
|
func (impl *ServiceConfig) endpointsChannelListener(source string, listenChannel chan EndpointsUpdate) {
|
||||||
endpointMap := make(map[string]api.Endpoints)
|
endpointMap := make(map[string]api.Endpoints)
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -214,11 +228,11 @@ func (impl *ServiceConfig) GetServiceConfigurationChannel(source string) chan Se
|
|||||||
}
|
}
|
||||||
newChannel := make(chan ServiceUpdate)
|
newChannel := make(chan ServiceUpdate)
|
||||||
impl.serviceConfigSources[source] = newChannel
|
impl.serviceConfigSources[source] = newChannel
|
||||||
go impl.ServiceChannelListener(source, newChannel)
|
go impl.serviceChannelListener(source, newChannel)
|
||||||
return newChannel
|
return newChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEndpointConfigurationChannel returns a channel where a configuration source
|
// GetEndpointsConfigurationChannel returns a channel where a configuration source
|
||||||
// can send updates of new endpoint configurations. Multiple calls with the same
|
// can send updates of new endpoint configurations. Multiple calls with the same
|
||||||
// source will return the same channel. This allows change and state based sources
|
// source will return the same channel. This allows change and state based sources
|
||||||
// to use the same channel. Difference source names however will be treated as a
|
// to use the same channel. Difference source names however will be treated as a
|
||||||
@ -235,11 +249,11 @@ func (impl *ServiceConfig) GetEndpointsConfigurationChannel(source string) chan
|
|||||||
}
|
}
|
||||||
newChannel := make(chan EndpointsUpdate)
|
newChannel := make(chan EndpointsUpdate)
|
||||||
impl.endpointsConfigSources[source] = newChannel
|
impl.endpointsConfigSources[source] = newChannel
|
||||||
go impl.EndpointsChannelListener(source, newChannel)
|
go impl.endpointsChannelListener(source, newChannel)
|
||||||
return newChannel
|
return newChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register ServiceConfigHandler to receive updates of changes to services.
|
// RegisterServiceHandler registers the ServiceConfigHandler to receive updates of changes to services.
|
||||||
func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) {
|
func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler) {
|
||||||
impl.handlerLock.Lock()
|
impl.handlerLock.Lock()
|
||||||
defer impl.handlerLock.Unlock()
|
defer impl.handlerLock.Unlock()
|
||||||
@ -255,7 +269,7 @@ func (impl *ServiceConfig) RegisterServiceHandler(handler ServiceConfigHandler)
|
|||||||
panic("Only up to 10 service handlers supported for now")
|
panic("Only up to 10 service handlers supported for now")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register ServiceConfigHandler to receive updates of changes to services.
|
// RegisterEndpointsHandler registers the EndpointsConfigHandler to receive updates of changes to services.
|
||||||
func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) {
|
func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandler) {
|
||||||
impl.handlerLock.Lock()
|
impl.handlerLock.Lock()
|
||||||
defer impl.handlerLock.Unlock()
|
defer impl.handlerLock.Unlock()
|
||||||
@ -271,8 +285,9 @@ func (impl *ServiceConfig) RegisterEndpointsHandler(handler EndpointsConfigHandl
|
|||||||
panic("Only up to 10 endpoint handlers supported for now")
|
panic("Only up to 10 endpoint handlers supported for now")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (impl *ServiceConfig) NotifyServiceUpdate() {
|
// notifyServiceUpdate calls the registered ServiceConfigHandlers with the current states of services.
|
||||||
services := make([]api.Service, 0)
|
func (impl *ServiceConfig) notifyServiceUpdate() {
|
||||||
|
services := []api.Service{}
|
||||||
impl.configLock.RLock()
|
impl.configLock.RLock()
|
||||||
for _, sourceServices := range impl.serviceConfig {
|
for _, sourceServices := range impl.serviceConfig {
|
||||||
for _, value := range sourceServices {
|
for _, value := range sourceServices {
|
||||||
@ -291,8 +306,9 @@ func (impl *ServiceConfig) NotifyServiceUpdate() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (impl *ServiceConfig) NotifyEndpointsUpdate() {
|
// notifyEndpointsUpdate calls the registered EndpointsConfigHandlers with the current states of endpoints.
|
||||||
endpoints := make([]api.Endpoints, 0)
|
func (impl *ServiceConfig) notifyEndpointsUpdate() {
|
||||||
|
endpoints := []api.Endpoints{}
|
||||||
impl.configLock.RLock()
|
impl.configLock.RLock()
|
||||||
for _, sourceEndpoints := range impl.endpointConfig {
|
for _, sourceEndpoints := range impl.endpointConfig {
|
||||||
for _, value := range sourceEndpoints {
|
for _, value := range sourceEndpoints {
|
||||||
|
@ -15,7 +15,7 @@ limitations under the License.
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
// Watches etcd and gets the full configuration on preset intervals.
|
// Watches etcd and gets the full configuration on preset intervals.
|
||||||
// Expects the list of exposed services to live under:
|
// It expects the list of exposed services to live under:
|
||||||
// registry/services
|
// registry/services
|
||||||
// which in etcd is exposed like so:
|
// which in etcd is exposed like so:
|
||||||
// http://<etcd server>/v2/keys/registry/services
|
// http://<etcd server>/v2/keys/registry/services
|
||||||
@ -30,7 +30,7 @@ limitations under the License.
|
|||||||
// '[ { "machine": <host>, "name": <name", "port": <port> },
|
// '[ { "machine": <host>, "name": <name", "port": <port> },
|
||||||
// { "machine": <host2>, "name": <name2", "port": <port2> }
|
// { "machine": <host2>, "name": <name2", "port": <port2> }
|
||||||
// ]',
|
// ]',
|
||||||
//
|
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -44,14 +44,18 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
const RegistryRoot = "registry/services"
|
// registryRoot is the key prefix for service configs in etcd.
|
||||||
|
const registryRoot = "registry/services"
|
||||||
|
|
||||||
|
// ConfigSourceEtcd communicates with a etcd via the client, and sends the change notification of services and endpoints to the specified channels.
|
||||||
type ConfigSourceEtcd struct {
|
type ConfigSourceEtcd struct {
|
||||||
client *etcd.Client
|
client *etcd.Client
|
||||||
serviceChannel chan ServiceUpdate
|
serviceChannel chan ServiceUpdate
|
||||||
endpointsChannel chan EndpointsUpdate
|
endpointsChannel chan EndpointsUpdate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewConfigSourceEtcd creates a new ConfigSourceEtcd.
|
||||||
|
// It immediately runs the created ConfigSourceEtcd in a goroutine.
|
||||||
func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd {
|
func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceEtcd {
|
||||||
config := ConfigSourceEtcd{
|
config := ConfigSourceEtcd{
|
||||||
client: client,
|
client: client,
|
||||||
@ -62,13 +66,14 @@ func NewConfigSourceEtcd(client *etcd.Client, serviceChannel chan ServiceUpdate,
|
|||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run begins watching for new services and their endpoints on etcd.
|
||||||
func (impl ConfigSourceEtcd) Run() {
|
func (impl ConfigSourceEtcd) Run() {
|
||||||
// Initially, just wait for the etcd to come up before doing anything more complicated.
|
// Initially, just wait for the etcd to come up before doing anything more complicated.
|
||||||
var services []api.Service
|
var services []api.Service
|
||||||
var endpoints []api.Endpoints
|
var endpoints []api.Endpoints
|
||||||
var err error
|
var err error
|
||||||
for {
|
for {
|
||||||
services, endpoints, err = impl.GetServices()
|
services, endpoints, err = impl.getServices()
|
||||||
if err == nil {
|
if err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -87,10 +92,10 @@ func (impl ConfigSourceEtcd) Run() {
|
|||||||
|
|
||||||
// Ok, so we got something back from etcd. Let's set up a watch for new services, and
|
// Ok, so we got something back from etcd. Let's set up a watch for new services, and
|
||||||
// their endpoints
|
// their endpoints
|
||||||
go impl.WatchForChanges()
|
go impl.watchForChanges()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
services, endpoints, err = impl.GetServices()
|
services, endpoints, err = impl.getServices()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err)
|
glog.Errorf("ConfigSourceEtcd: Failed to get services: %v", err)
|
||||||
} else {
|
} else {
|
||||||
@ -107,12 +112,12 @@ func (impl ConfigSourceEtcd) Run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finds the list of services and their endpoints from etcd.
|
// getServices finds the list of services and their endpoints from etcd.
|
||||||
// This operation is akin to a set a known good at regular intervals.
|
// This operation is akin to a set a known good at regular intervals.
|
||||||
func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) {
|
func (impl ConfigSourceEtcd) getServices() ([]api.Service, []api.Endpoints, error) {
|
||||||
response, err := impl.client.Get(RegistryRoot+"/specs", true, false)
|
response, err := impl.client.Get(registryRoot+"/specs", true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to get the key %s: %v", RegistryRoot, err)
|
glog.Errorf("Failed to get the key %s: %v", registryRoot, err)
|
||||||
return make([]api.Service, 0), make([]api.Endpoints, 0), err
|
return make([]api.Service, 0), make([]api.Endpoints, 0), err
|
||||||
}
|
}
|
||||||
if response.Node.Dir == true {
|
if response.Node.Dir == true {
|
||||||
@ -129,7 +134,7 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
retServices[i] = svc
|
retServices[i] = svc
|
||||||
endpoints, err := impl.GetEndpoints(svc.ID)
|
endpoints, err := impl.getEndpoints(svc.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
|
glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
|
||||||
}
|
}
|
||||||
@ -138,23 +143,23 @@ func (impl ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, erro
|
|||||||
}
|
}
|
||||||
return retServices, retEndpoints, err
|
return retServices, retEndpoints, err
|
||||||
}
|
}
|
||||||
return nil, nil, fmt.Errorf("did not get the root of the registry %s", RegistryRoot)
|
return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (impl ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) {
|
// getEndpoints finds the list of endpoints of the service from etcd.
|
||||||
key := fmt.Sprintf(RegistryRoot + "/endpoints/" + service)
|
func (impl ConfigSourceEtcd) getEndpoints(service string) (api.Endpoints, error) {
|
||||||
|
key := fmt.Sprintf(registryRoot + "/endpoints/" + service)
|
||||||
response, err := impl.client.Get(key, true, false)
|
response, err := impl.client.Get(key, true, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to get the key: %s %v", key, err)
|
glog.Errorf("Failed to get the key: %s %v", key, err)
|
||||||
return api.Endpoints{}, err
|
return api.Endpoints{}, err
|
||||||
}
|
}
|
||||||
// Parse all the endpoint specifications in this value.
|
// Parse all the endpoint specifications in this value.
|
||||||
return ParseEndpoints(response.Node.Value)
|
return parseEndpoints(response.Node.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// EtcdResponseToServiceAndLocalport takes an etcd response and pulls it apart to find
|
// etcdResponseToService takes an etcd response and pulls it apart to find service.
|
||||||
// service
|
func etcdResponseToService(response *etcd.Response) (*api.Service, error) {
|
||||||
func EtcdResponseToService(response *etcd.Response) (*api.Service, error) {
|
|
||||||
if response.Node == nil {
|
if response.Node == nil {
|
||||||
return nil, fmt.Errorf("invalid response from etcd: %#v", response)
|
return nil, fmt.Errorf("invalid response from etcd: %#v", response)
|
||||||
}
|
}
|
||||||
@ -166,31 +171,31 @@ func EtcdResponseToService(response *etcd.Response) (*api.Service, error) {
|
|||||||
return &svc, err
|
return &svc, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseEndpoints(jsonString string) (api.Endpoints, error) {
|
func parseEndpoints(jsonString string) (api.Endpoints, error) {
|
||||||
var e api.Endpoints
|
var e api.Endpoints
|
||||||
err := json.Unmarshal([]byte(jsonString), &e)
|
err := json.Unmarshal([]byte(jsonString), &e)
|
||||||
return e, err
|
return e, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (impl ConfigSourceEtcd) WatchForChanges() {
|
func (impl ConfigSourceEtcd) watchForChanges() {
|
||||||
glog.Info("Setting up a watch for new services")
|
glog.Info("Setting up a watch for new services")
|
||||||
watchChannel := make(chan *etcd.Response)
|
watchChannel := make(chan *etcd.Response)
|
||||||
go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil)
|
go impl.client.Watch("/registry/services/", 0, true, watchChannel, nil)
|
||||||
for {
|
for {
|
||||||
watchResponse := <-watchChannel
|
watchResponse := <-watchChannel
|
||||||
impl.ProcessChange(watchResponse)
|
impl.processChange(watchResponse)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
|
func (impl ConfigSourceEtcd) processChange(response *etcd.Response) {
|
||||||
glog.Infof("Processing a change in service configuration... %s", *response)
|
glog.Infof("Processing a change in service configuration... %s", *response)
|
||||||
|
|
||||||
// If it's a new service being added (signified by a localport being added)
|
// If it's a new service being added (signified by a localport being added)
|
||||||
// then process it as such
|
// then process it as such
|
||||||
if strings.Contains(response.Node.Key, "/endpoints/") {
|
if strings.Contains(response.Node.Key, "/endpoints/") {
|
||||||
impl.ProcessEndpointResponse(response)
|
impl.processEndpointResponse(response)
|
||||||
} else if response.Action == "set" {
|
} else if response.Action == "set" {
|
||||||
service, err := EtcdResponseToService(response)
|
service, err := etcdResponseToService(response)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Failed to parse %s Port: %s", response, err)
|
glog.Errorf("Failed to parse %s Port: %s", response, err)
|
||||||
return
|
return
|
||||||
@ -208,13 +213,12 @@ func (impl ConfigSourceEtcd) ProcessChange(response *etcd.Response) {
|
|||||||
serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}}
|
serviceUpdate := ServiceUpdate{Op: REMOVE, Services: []api.Service{{JSONBase: api.JSONBase{ID: parts[3]}}}}
|
||||||
impl.serviceChannel <- serviceUpdate
|
impl.serviceChannel <- serviceUpdate
|
||||||
return
|
return
|
||||||
} else {
|
}
|
||||||
glog.Infof("Unknown service delete: %#v", parts)
|
glog.Infof("Unknown service delete: %#v", parts)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (impl ConfigSourceEtcd) ProcessEndpointResponse(response *etcd.Response) {
|
func (impl ConfigSourceEtcd) processEndpointResponse(response *etcd.Response) {
|
||||||
glog.Infof("Processing a change in endpoint configuration... %s", *response)
|
glog.Infof("Processing a change in endpoint configuration... %s", *response)
|
||||||
var endpoints api.Endpoints
|
var endpoints api.Endpoints
|
||||||
err := json.Unmarshal([]byte(response.Node.Value), &endpoints)
|
err := json.Unmarshal([]byte(response.Node.Value), &endpoints)
|
||||||
|
@ -26,15 +26,15 @@ import (
|
|||||||
|
|
||||||
const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34"
|
const TomcatContainerEtcdKey = "/registry/services/tomcat/endpoints/tomcat-3bd5af34"
|
||||||
const TomcatService = "tomcat"
|
const TomcatService = "tomcat"
|
||||||
const TomcatContainerId = "tomcat-3bd5af34"
|
const TomcatContainerID = "tomcat-3bd5af34"
|
||||||
|
|
||||||
func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) {
|
func validateJSONParsing(t *testing.T, jsonString string, expectedEndpoints api.Endpoints, expectError bool) {
|
||||||
endpoints, err := ParseEndpoints(jsonString)
|
endpoints, err := parseEndpoints(jsonString)
|
||||||
if err == nil && expectError {
|
if err == nil && expectError {
|
||||||
t.Errorf("ValidateJsonParsing did not get expected error when parsing %s", jsonString)
|
t.Errorf("validateJSONParsing did not get expected error when parsing %s", jsonString)
|
||||||
}
|
}
|
||||||
if err != nil && !expectError {
|
if err != nil && !expectError {
|
||||||
t.Errorf("ValidateJsonParsing got unexpected error %+v when parsing %s", err, jsonString)
|
t.Errorf("validateJSONParsing got unexpected error %+v when parsing %s", err, jsonString)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(expectedEndpoints, endpoints) {
|
if !reflect.DeepEqual(expectedEndpoints, endpoints) {
|
||||||
t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints)
|
t.Errorf("Didn't get expected endpoints %+v got: %+v", expectedEndpoints, endpoints)
|
||||||
@ -42,7 +42,7 @@ func ValidateJsonParsing(t *testing.T, jsonString string, expectedEndpoints api.
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestParseJsonEndpoints(t *testing.T) {
|
func TestParseJsonEndpoints(t *testing.T) {
|
||||||
ValidateJsonParsing(t, "", api.Endpoints{}, true)
|
validateJSONParsing(t, "", api.Endpoints{}, true)
|
||||||
endpoints := api.Endpoints{
|
endpoints := api.Endpoints{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
Endpoints: []string{"foo", "bar", "baz"},
|
Endpoints: []string{"foo", "bar", "baz"},
|
||||||
@ -51,6 +51,6 @@ func TestParseJsonEndpoints(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %#v", err)
|
t.Errorf("Unexpected error: %#v", err)
|
||||||
}
|
}
|
||||||
ValidateJsonParsing(t, string(data), endpoints, false)
|
validateJSONParsing(t, string(data), endpoints, false)
|
||||||
// ValidateJsonParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false)
|
// validateJSONParsing(t, "[{\"port\":8000,\"name\":\"mysql\",\"machine\":\"foo\"},{\"port\":9000,\"name\":\"mysql\",\"machine\":\"bar\"}]", []string{"foo:8000", "bar:9000"}, false)
|
||||||
}
|
}
|
||||||
|
@ -28,6 +28,7 @@ limitations under the License.
|
|||||||
// }
|
// }
|
||||||
//]
|
//]
|
||||||
//}
|
//}
|
||||||
|
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@ -41,22 +42,24 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: kill this struct.
|
// serviceConfig is a deserialized form of the config file format which ConfigSourceFile accepts.
|
||||||
type ServiceJSON struct {
|
type serviceConfig struct {
|
||||||
Name string
|
Services []struct {
|
||||||
Port int
|
Name string `json: "name"`
|
||||||
Endpoints []string
|
Port int `json: "port"`
|
||||||
}
|
Endpoints []string `json: "endpoints"`
|
||||||
type ConfigFile struct {
|
} `json: "service"`
|
||||||
Services []ServiceJSON
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConfigSourceFile periodically reads service configurations in JSON from a file, and sends the services and endpoints defined in th file to the specified channels.
|
||||||
type ConfigSourceFile struct {
|
type ConfigSourceFile struct {
|
||||||
serviceChannel chan ServiceUpdate
|
serviceChannel chan ServiceUpdate
|
||||||
endpointsChannel chan EndpointsUpdate
|
endpointsChannel chan EndpointsUpdate
|
||||||
filename string
|
filename string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewConfigSourceFile creates a new ConfigSourceFile.
|
||||||
|
// It immediately runs the created ConfigSourceFile in a goroutine.
|
||||||
func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile {
|
func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, endpointsChannel chan EndpointsUpdate) ConfigSourceFile {
|
||||||
config := ConfigSourceFile{
|
config := ConfigSourceFile{
|
||||||
filename: filename,
|
filename: filename,
|
||||||
@ -67,6 +70,7 @@ func NewConfigSourceFile(filename string, serviceChannel chan ServiceUpdate, end
|
|||||||
return config
|
return config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run begins watching the config file.
|
||||||
func (impl ConfigSourceFile) Run() {
|
func (impl ConfigSourceFile) Run() {
|
||||||
glog.Infof("Watching file %s", impl.filename)
|
glog.Infof("Watching file %s", impl.filename)
|
||||||
var lastData []byte
|
var lastData []byte
|
||||||
@ -78,7 +82,7 @@ func (impl ConfigSourceFile) Run() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't read file: %s : %v", impl.filename, err)
|
glog.Errorf("Couldn't read file: %s : %v", impl.filename, err)
|
||||||
} else {
|
} else {
|
||||||
var config ConfigFile
|
var config serviceConfig
|
||||||
err = json.Unmarshal(data, &config)
|
err = json.Unmarshal(data, &config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err)
|
glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err)
|
||||||
|
@ -38,8 +38,8 @@ func NewProxier(loadBalancer LoadBalancer) *Proxier {
|
|||||||
return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)}
|
return &Proxier{loadBalancer: loadBalancer, serviceMap: make(map[string]int)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyBytes copies bytes from in to out until EOF.
|
// copyBytes copies bytes from in to out until EOF.
|
||||||
func CopyBytes(in, out *net.TCPConn) {
|
func copyBytes(in, out *net.TCPConn) {
|
||||||
glog.Infof("Copying from %v <-> %v <-> %v <-> %v",
|
glog.Infof("Copying from %v <-> %v <-> %v <-> %v",
|
||||||
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
|
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
|
||||||
_, err := io.Copy(in, out)
|
_, err := io.Copy(in, out)
|
||||||
@ -51,15 +51,17 @@ func CopyBytes(in, out *net.TCPConn) {
|
|||||||
out.CloseWrite()
|
out.CloseWrite()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProxyConnection creates a bidirectional byte shuffler.
|
// proxyConnection creates a bidirectional byte shuffler.
|
||||||
// Copies bytes to/from each connection.
|
// It copies bytes to/from each connection.
|
||||||
func ProxyConnection(in, out *net.TCPConn) {
|
func proxyConnection(in, out *net.TCPConn) {
|
||||||
glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
|
glog.Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
|
||||||
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
|
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
|
||||||
go CopyBytes(in, out)
|
go copyBytes(in, out)
|
||||||
go CopyBytes(out, in)
|
go copyBytes(out, in)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AcceptHandler begins accepting incoming connections from listener and proxying the connections to the load-balanced endpoints.
|
||||||
|
// It never returns.
|
||||||
func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
|
func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
|
||||||
for {
|
for {
|
||||||
inConn, err := listener.Accept()
|
inConn, err := listener.Accept()
|
||||||
@ -86,12 +88,12 @@ func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
|
|||||||
inConn.Close()
|
inConn.Close()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
go ProxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
|
go proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddService starts listening for a new service on a given port.
|
// addService starts listening for a new service on a given port.
|
||||||
func (proxier Proxier) AddService(service string, port int) error {
|
func (proxier Proxier) addService(service string, port int) error {
|
||||||
// Make sure we can start listening on the port before saying all's well.
|
// Make sure we can start listening on the port before saying all's well.
|
||||||
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
l, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -127,7 +129,7 @@ func (proxier Proxier) OnUpdate(services []api.Service) {
|
|||||||
port, exists := proxier.serviceMap[service.ID]
|
port, exists := proxier.serviceMap[service.ID]
|
||||||
if !exists || port != service.Port {
|
if !exists || port != service.Port {
|
||||||
glog.Infof("Adding a new service %s on port %d", service.ID, service.Port)
|
glog.Infof("Adding a new service %s on port %d", service.ID, service.Port)
|
||||||
err := proxier.AddService(service.ID, service.Port)
|
err := proxier.addService(service.ID, service.Port)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
proxier.serviceMap[service.ID] = service.Port
|
proxier.serviceMap[service.ID] = service.Port
|
||||||
} else {
|
} else {
|
||||||
|
@ -30,7 +30,7 @@ import (
|
|||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
// LoadBalancerRR is a round-robin load balancer.
|
// LoadBalancerRR is a round-robin load balancer. It implements LoadBalancer.
|
||||||
type LoadBalancerRR struct {
|
type LoadBalancerRR struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
endpointsMap map[string][]string
|
endpointsMap map[string][]string
|
||||||
@ -42,9 +42,7 @@ func NewLoadBalancerRR() *LoadBalancerRR {
|
|||||||
return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)}
|
return &LoadBalancerRR{endpointsMap: make(map[string][]string), rrIndex: make(map[string]int)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadBalance registers srcAddr for the provided service.
|
// LoadBalance select an endpoint of the service by round-robin algorithm.
|
||||||
// It returns error if no entry is available for the service
|
|
||||||
// or no previous endpoints are registered.
|
|
||||||
func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) {
|
func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string, error) {
|
||||||
impl.lock.RLock()
|
impl.lock.RLock()
|
||||||
endpoints, exists := impl.endpointsMap[service]
|
endpoints, exists := impl.endpointsMap[service]
|
||||||
@ -61,8 +59,8 @@ func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string
|
|||||||
return endpoint, nil
|
return endpoint, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsValid returns true if spec is valid.
|
// isValid returns true if spec is valid.
|
||||||
func (impl LoadBalancerRR) IsValid(spec string) bool {
|
func (impl LoadBalancerRR) isValid(spec string) bool {
|
||||||
index := strings.Index(spec, ":")
|
index := strings.Index(spec, ":")
|
||||||
if index == -1 {
|
if index == -1 {
|
||||||
return false
|
return false
|
||||||
@ -74,11 +72,11 @@ func (impl LoadBalancerRR) IsValid(spec string) bool {
|
|||||||
return value > 0
|
return value > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterValidEndpoints filters out invalid endpoints.
|
// filterValidEndpoints filters out invalid endpoints.
|
||||||
func (impl LoadBalancerRR) FilterValidEndpoints(endpoints []string) []string {
|
func (impl LoadBalancerRR) filterValidEndpoints(endpoints []string) []string {
|
||||||
var result []string
|
var result []string
|
||||||
for _, spec := range endpoints {
|
for _, spec := range endpoints {
|
||||||
if impl.IsValid(spec) {
|
if impl.isValid(spec) {
|
||||||
result = append(result, spec)
|
result = append(result, spec)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -97,7 +95,7 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) {
|
|||||||
existingEndpoints, exists := impl.endpointsMap[value.Name]
|
existingEndpoints, exists := impl.endpointsMap[value.Name]
|
||||||
if !exists || !reflect.DeepEqual(value.Endpoints, existingEndpoints) {
|
if !exists || !reflect.DeepEqual(value.Endpoints, existingEndpoints) {
|
||||||
glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints)
|
glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints)
|
||||||
impl.endpointsMap[value.Name] = impl.FilterValidEndpoints(value.Endpoints)
|
impl.endpointsMap[value.Name] = impl.filterValidEndpoints(value.Endpoints)
|
||||||
// Start RR from the beginning if added or updated.
|
// Start RR from the beginning if added or updated.
|
||||||
impl.rrIndex[value.Name] = 0
|
impl.rrIndex[value.Name] = 0
|
||||||
}
|
}
|
||||||
|
@ -24,16 +24,16 @@ import (
|
|||||||
|
|
||||||
func TestLoadBalanceValidateWorks(t *testing.T) {
|
func TestLoadBalanceValidateWorks(t *testing.T) {
|
||||||
loadBalancer := NewLoadBalancerRR()
|
loadBalancer := NewLoadBalancerRR()
|
||||||
if loadBalancer.IsValid("") {
|
if loadBalancer.isValid("") {
|
||||||
t.Errorf("Didn't fail for empty string")
|
t.Errorf("Didn't fail for empty string")
|
||||||
}
|
}
|
||||||
if loadBalancer.IsValid("foobar") {
|
if loadBalancer.isValid("foobar") {
|
||||||
t.Errorf("Didn't fail with no port")
|
t.Errorf("Didn't fail with no port")
|
||||||
}
|
}
|
||||||
if loadBalancer.IsValid("foobar:-1") {
|
if loadBalancer.isValid("foobar:-1") {
|
||||||
t.Errorf("Didn't fail with a negative port")
|
t.Errorf("Didn't fail with a negative port")
|
||||||
}
|
}
|
||||||
if !loadBalancer.IsValid("foobar:8080") {
|
if !loadBalancer.isValid("foobar:8080") {
|
||||||
t.Errorf("Failed a valid config.")
|
t.Errorf("Failed a valid config.")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -41,7 +41,7 @@ func TestLoadBalanceValidateWorks(t *testing.T) {
|
|||||||
func TestLoadBalanceFilterWorks(t *testing.T) {
|
func TestLoadBalanceFilterWorks(t *testing.T) {
|
||||||
loadBalancer := NewLoadBalancerRR()
|
loadBalancer := NewLoadBalancerRR()
|
||||||
endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"}
|
endpoints := []string{"foobar:1", "foobar:2", "foobar:-1", "foobar:3", "foobar:-2"}
|
||||||
filtered := loadBalancer.FilterValidEndpoints(endpoints)
|
filtered := loadBalancer.filterValidEndpoints(endpoints)
|
||||||
|
|
||||||
if len(filtered) != 3 {
|
if len(filtered) != 3 {
|
||||||
t.Errorf("Failed to filter to the correct size")
|
t.Errorf("Failed to filter to the correct size")
|
||||||
|
Loading…
Reference in New Issue
Block a user