[kube-proxy] Mass service/endpoint info functions rename and comments

This commit is contained in:
Zihong Zheng
2018-02-24 13:32:55 -08:00
parent 06064498de
commit f6eed81f21
9 changed files with 287 additions and 292 deletions

View File

@@ -310,7 +310,7 @@ func NewProxier(ipt utiliptables.Interface,
proxier := &Proxier{
portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable),
serviceMap: make(proxy.ServiceMap),
serviceChanges: proxy.NewServiceChangeTracker(customizeServiceInfo, &isIPv6, recorder),
serviceChanges: proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
endpointsMap: make(proxy.EndpointsMap),
endpointsChanges: proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
syncPeriod: syncPeriod,
@@ -354,14 +354,14 @@ func NewProxier(ipt utiliptables.Interface,
// internal struct for string service information
type serviceInfo struct {
*proxy.ServiceInfoCommon
*proxy.BaseServiceInfo
// The following fields are computed and stored for performance reasons.
serviceNameString string
}
// returns a new proxy.ServicePort which abstracts a serviceInfo
func customizeServiceInfo(port *api.ServicePort, service *api.Service, infoCommon *proxy.ServiceInfoCommon) proxy.ServicePort {
info := &serviceInfo{ServiceInfoCommon: infoCommon}
func newServiceInfo(port *api.ServicePort, service *api.Service, baseInfo *proxy.BaseServiceInfo) proxy.ServicePort {
info := &serviceInfo{BaseServiceInfo: baseInfo}
// Store the following for performance reasons.
svcName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
@@ -552,9 +552,7 @@ func (proxier *Proxier) isInitialized() bool {
// OnServiceAdd is called whenever creation of new service object is observed.
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
if proxier.serviceChanges.Update(nil, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
proxier.OnServiceUpdate(nil, service)
}
// OnServiceUpdate is called whenever modification of an existing service object is observed.
@@ -566,9 +564,7 @@ func (proxier *Proxier) OnServiceUpdate(oldService, service *api.Service) {
// OnServiceDelete is called whenever deletion of an existing service object is observed.
func (proxier *Proxier) OnServiceDelete(service *api.Service) {
if proxier.serviceChanges.Update(service, nil) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
proxier.OnServiceUpdate(service, nil)
}
// OnServiceSynced is called once all the initial even handlers were called and the state is fully propagated to local cache.
@@ -584,9 +580,7 @@ func (proxier *Proxier) OnServiceSynced() {
// OnEndpointsAdd is called whenever creation of new endpoints object is observed.
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
if proxier.endpointsChanges.Update(nil, endpoints) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
proxier.OnEndpointsUpdate(nil, endpoints)
}
// OnEndpointsUpdate is called whenever modification of an existing endpoints object is observed.
@@ -598,9 +592,7 @@ func (proxier *Proxier) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints
// OnEndpointsDelete is called whenever deletion of an existing endpoints object is observed.
func (proxier *Proxier) OnEndpointsDelete(endpoints *api.Endpoints) {
if proxier.endpointsChanges.Update(endpoints, nil) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
proxier.OnEndpointsUpdate(endpoints, nil)
}
// OnEndpointsSynced is called once all the initial event handlers were called and the state is fully propagated to local cache.
@@ -642,8 +634,8 @@ func (proxier *Proxier) syncProxyRules() {
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.GetProtocol() == api.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.GetClusterIP())
staleServices.Insert(svcInfo.GetClusterIP())
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.ClusterIPString())
staleServices.Insert(svcInfo.ClusterIPString())
}
}
@@ -759,9 +751,9 @@ func (proxier *Proxier) syncProxyRules() {
// Handle traffic that loops back to the originator with SNAT.
for _, e := range proxier.endpointsMap[svcName] {
ep, ok := e.(*proxy.EndpointInfoCommon)
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
glog.Errorf("Failed to cast EndpointInfoCommon %q", e.String())
glog.Errorf("Failed to cast BaseEndpointInfo %q", e.String())
continue
}
epIP := ep.IP()
@@ -1269,7 +1261,7 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
for _, epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == api.ProtocolUDP {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.GetClusterIP(), endpointIP, clientv1.ProtocolUDP)
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, clientv1.ProtocolUDP)
if err != nil {
glog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
}