Merge branch 'master' into fix/golint

Conflicts:
	pkg/master/master.go
	pkg/master/pod_cache.go
	pkg/proxy/config/file.go
	pkg/proxy/proxier.go
	pkg/proxy/roundrobbin.go
	pkg/scheduler/randomfit.go
	pkg/scheduler/randomfit_test.go
This commit is contained in:
Yuki Yugui Sonoda
2014-07-15 20:18:21 +09:00
63 changed files with 1885 additions and 292 deletions

View File

@@ -81,35 +81,38 @@ func (impl ConfigSourceFile) Run() {
data, err := ioutil.ReadFile(impl.filename)
if err != nil {
glog.Errorf("Couldn't read file: %s : %v", impl.filename, err)
} else {
var config serviceConfig
err = json.Unmarshal(data, &config)
if err != nil {
glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err)
} else {
if !bytes.Equal(lastData, data) {
lastData = data
// Ok, we have a valid configuration, send to channel for
// rejiggering.
newServices := make([]api.Service, len(config.Services))
newEndpoints := make([]api.Endpoints, len(config.Services))
for i, service := range config.Services {
newServices[i] = api.Service{JSONBase: api.JSONBase{ID: service.Name}, Port: service.Port}
newEndpoints[i] = api.Endpoints{Name: service.Name, Endpoints: service.Endpoints}
}
if !reflect.DeepEqual(lastServices, newServices) {
serviceUpdate := ServiceUpdate{Op: SET, Services: newServices}
impl.serviceChannel <- serviceUpdate
lastServices = newServices
}
if !reflect.DeepEqual(lastEndpoints, newEndpoints) {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: newEndpoints}
impl.endpointsChannel <- endpointsUpdate
lastEndpoints = newEndpoints
}
}
}
continue
}
if bytes.Equal(lastData, data) {
continue
}
lastData = data
config := &serviceConfig{}
if err = json.Unmarshal(data, config); err != nil {
glog.Errorf("Couldn't unmarshal configuration from file : %s %v", data, err)
continue
}
// Ok, we have a valid configuration, send to channel for
// rejiggering.
newServices := make([]api.Service, len(config.Services))
newEndpoints := make([]api.Endpoints, len(config.Services))
for i, service := range config.Services {
newServices[i] = api.Service{JSONBase: api.JSONBase{ID: service.Name}, Port: service.Port}
newEndpoints[i] = api.Endpoints{Name: service.Name, Endpoints: service.Endpoints}
}
if !reflect.DeepEqual(lastServices, newServices) {
serviceUpdate := ServiceUpdate{Op: SET, Services: newServices}
impl.serviceChannel <- serviceUpdate
lastServices = newServices
}
if !reflect.DeepEqual(lastEndpoints, newEndpoints) {
endpointsUpdate := EndpointsUpdate{Op: SET, Endpoints: newEndpoints}
impl.endpointsChannel <- endpointsUpdate
lastEndpoints = newEndpoints
}
time.Sleep(5 * time.Second)
}
}

View File

@@ -43,7 +43,7 @@ func copyBytes(in, out *net.TCPConn) {
glog.Infof("Copying from %v <-> %v <-> %v <-> %v",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
_, err := io.Copy(in, out)
if err != nil && err != io.EOF {
if err != nil {
glog.Errorf("I/O error: %v", err)
}
@@ -88,7 +88,7 @@ func (proxier Proxier) AcceptHandler(service string, listener net.Listener) {
inConn.Close()
continue
}
go proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
proxyConnection(inConn.(*net.TCPConn), outConn.(*net.TCPConn))
}
}

View File

@@ -23,7 +23,6 @@ import (
"net"
"reflect"
"strconv"
"strings"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -59,13 +58,12 @@ func (impl LoadBalancerRR) LoadBalance(service string, srcAddr net.Addr) (string
return endpoint, nil
}
// isValid returns true if spec is valid.
func (impl LoadBalancerRR) isValid(spec string) bool {
index := strings.Index(spec, ":")
if index == -1 {
_, port, err := net.SplitHostPort(spec)
if err != nil {
return false
}
value, err := strconv.Atoi(spec[index+1:])
value, err := strconv.Atoi(port)
if err != nil {
return false
}
@@ -93,9 +91,10 @@ func (impl LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) {
// First update / add all new endpoints for services.
for _, value := range endpoints {
existingEndpoints, exists := impl.endpointsMap[value.Name]
if !exists || !reflect.DeepEqual(value.Endpoints, existingEndpoints) {
validEndpoints := impl.filterValidEndpoints(value.Endpoints)
if !exists || !reflect.DeepEqual(existingEndpoints, validEndpoints) {
glog.Infof("LoadBalancerRR: Setting endpoints for %s to %+v", value.Name, value.Endpoints)
impl.endpointsMap[value.Name] = impl.filterValidEndpoints(value.Endpoints)
impl.endpointsMap[value.Name] = validEndpoints
// Start RR from the beginning if added or updated.
impl.rrIndex[value.Name] = 0
}