Clean up logging, make initial sync faster

This commit is contained in:
Tim Hockin 2015-08-14 21:53:52 -07:00
parent d72892d0b0
commit f1a48574a6
2 changed files with 20 additions and 14 deletions

View File

@ -19,6 +19,7 @@ package config
import ( import (
"sync" "sync"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
@ -91,6 +92,7 @@ func NewEndpointsConfig() *EndpointsConfig {
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
handler.OnEndpointsUpdate(instance.([]api.Endpoints)) handler.OnEndpointsUpdate(instance.([]api.Endpoints))
})) }))
} }
@ -126,19 +128,19 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
update := change.(EndpointsUpdate) update := change.(EndpointsUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints) glog.V(4).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints))
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
endpoints[name] = value endpoints[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing an endpoint %+v", update) glog.V(4).Infof("Removing an endpoint %s", spew.Sdump(update))
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(endpoints, name) delete(endpoints, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting endpoints %+v", update) glog.V(4).Infof("Setting endpoints %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
endpoints = make(map[types.NamespacedName]api.Endpoints) endpoints = make(map[types.NamespacedName]api.Endpoints)
for _, value := range update.Endpoints { for _, value := range update.Endpoints {
@ -146,7 +148,7 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
endpoints[name] = value endpoints[name] = value
} }
default: default:
glog.V(4).Infof("Received invalid update type: %v", update) glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
} }
s.endpoints[source] = endpoints s.endpoints[source] = endpoints
s.endpointLock.Unlock() s.endpointLock.Unlock()
@ -189,6 +191,7 @@ func NewServiceConfig() *ServiceConfig {
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
handler.OnServiceUpdate(instance.([]api.Service)) handler.OnServiceUpdate(instance.([]api.Service))
})) }))
} }
@ -224,19 +227,19 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
update := change.(ServiceUpdate) update := change.(ServiceUpdate)
switch update.Op { switch update.Op {
case ADD: case ADD:
glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services) glog.V(4).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))
for _, value := range update.Services { for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
services[name] = value services[name] = value
} }
case REMOVE: case REMOVE:
glog.V(4).Infof("Removing a service %+v", update) glog.V(4).Infof("Removing a service %s", spew.Sdump(update))
for _, value := range update.Services { for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(services, name) delete(services, name)
} }
case SET: case SET:
glog.V(4).Infof("Setting services %+v", update) glog.V(4).Infof("Setting services %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map // Clear the old map entries by just creating a new map
services = make(map[types.NamespacedName]api.Service) services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services { for _, value := range update.Services {
@ -244,7 +247,7 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
services[name] = value services[name] = value
} }
default: default:
glog.V(4).Infof("Received invalid update type: %v", update) glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
} }
s.services[source] = services s.services[source] = services
s.serviceLock.Unlock() s.serviceLock.Unlock()

View File

@ -33,6 +33,7 @@ import (
"time" "time"
"github.com/coreos/go-semver/semver" "github.com/coreos/go-semver/semver"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy"
@ -232,7 +233,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedServiceUpdate = true proxier.haveReceivedServiceUpdate = true
glog.V(4).Infof("Received service update notice: %+v", allServices)
activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set activeServices := make(map[proxy.ServicePortName]bool) // use a map as a set
for i := range allServices { for i := range allServices {
@ -256,7 +256,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
} }
if exists { if exists {
//Something changed. //Something changed.
glog.V(4).Infof("Something changed for service %q: removing it", serviceName) glog.V(3).Infof("Something changed for service %q: removing it", serviceName)
delete(proxier.serviceMap, serviceName) delete(proxier.serviceMap, serviceName)
} }
@ -273,7 +273,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
info.sessionAffinityType = service.Spec.SessionAffinity info.sessionAffinityType = service.Spec.SessionAffinity
proxier.serviceMap[serviceName] = info proxier.serviceMap[serviceName] = info
glog.V(4).Infof("info: %+v", info) glog.V(4).Infof("added serviceInfo(%s): %s", serviceName, spew.Sdump(info))
} }
} }
@ -297,7 +297,6 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true proxier.haveReceivedEndpointsUpdate = true
glog.V(4).Infof("Received endpoints update notice: %+v", allEndpoints)
registeredEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set registeredEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
// Update endpoints for services. // Update endpoints for services.
@ -349,6 +348,10 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.serviceMap[service].endpoints = nil proxier.serviceMap[service].endpoints = nil
} }
} }
if err := proxier.syncProxyRules(); err != nil {
glog.Errorf("Failed to sync iptables rules: %v", err)
}
} }
// used in OnEndpointsUpdate // used in OnEndpointsUpdate
@ -409,10 +412,10 @@ func servicePortAndEndpointToServiceChain(s proxy.ServicePortName, protocol stri
func (proxier *Proxier) syncProxyRules() error { func (proxier *Proxier) syncProxyRules() error {
// don't sync rules till we've received services and endpoints // don't sync rules till we've received services and endpoints
if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate {
glog.V(2).Info("not syncing iptables until Services and Endpoints have been received from master") glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return nil return nil
} }
glog.V(4).Infof("Syncing iptables rules") glog.V(3).Infof("Syncing iptables rules")
// Ensure main chains and rules are installed. // Ensure main chains and rules are installed.
inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting} inputChains := []utiliptables.Chain{utiliptables.ChainOutput, utiliptables.ChainPrerouting}