save allServices in prep for async iptables

This commit is contained in:
Tim Hockin 2017-04-02 22:01:02 -07:00
parent 4d8ffb23ef
commit af9a5321b5
2 changed files with 27 additions and 27 deletions

View File

@ -197,13 +197,14 @@ type Proxier struct {
serviceMap proxyServiceMap serviceMap proxyServiceMap
endpointsMap proxyEndpointMap endpointsMap proxyEndpointMap
portsMap map[localPort]closeable portsMap map[localPort]closeable
haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event // allServices and allEndpoints should never be modified by proxier - the
// allEndpoints should never be modified by proxier - the pointers // pointers are shared with higher layers of kube-proxy. They are guaranteed
// are shared with higher layers of kube-proxy. They are guaranteed // to not be modified in the meantime, but also require to be not modified
// to not be modified in the meantime, but also require to be not // by Proxier.
// modified by Proxier. // nil until we have seen an On*Update event.
// nil until we have seen an OnEndpointsUpdate event. allServices []*api.Service
allEndpoints []*api.Endpoints allEndpoints []*api.Endpoints
throttle flowcontrol.RateLimiter throttle flowcontrol.RateLimiter
// These are effectively const and do not need the mutex to be held. // These are effectively const and do not need the mutex to be held.
@ -529,13 +530,12 @@ func buildServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap)
// OnServiceUpdate tracks the active set of service proxies. // OnServiceUpdate tracks the active set of service proxies.
// They will be synchronized using syncProxyRules() // They will be synchronized using syncProxyRules()
func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) {
start := time.Now()
defer func() {
glog.V(4).Infof("OnServiceUpdate took %v for %d services", time.Since(start), len(allServices))
}()
proxier.mu.Lock() proxier.mu.Lock()
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedServiceUpdate = true if proxier.allServices == nil {
glog.V(2).Info("Received first Services update")
}
proxier.allServices = allServices
newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap) newServiceMap, hcAdd, hcDel, staleUDPServices := buildServiceMap(allServices, proxier.serviceMap)
for _, hc := range hcAdd { for _, hc := range hcAdd {
@ -769,7 +769,7 @@ func (proxier *Proxier) syncProxyRules() {
glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) glog.V(4).Infof("syncProxyRules took %v", time.Since(start))
}() }()
// don't sync rules till we've received services and endpoints // don't sync rules till we've received services and endpoints
if proxier.allEndpoints == nil || !proxier.haveReceivedServiceUpdate { if proxier.allEndpoints == nil || proxier.allServices == nil {
glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master")
return return
} }

View File

@ -370,7 +370,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
iptables: ipt, iptables: ipt,
clusterCIDR: "10.0.0.0/24", clusterCIDR: "10.0.0.0/24",
allEndpoints: []*api.Endpoints{}, allEndpoints: []*api.Endpoints{},
haveReceivedServiceUpdate: true, allServices: []*api.Service{},
hostname: testHostname, hostname: testHostname,
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}}, portMapper: &fakePortOpener{[]*localPort{}},